Рекомендуется запрашивать большое количество объектов ndb из хранилища данных


я столкнулся с интересным ограничением с хранилищем данных App Engine. Я создаю обработчик, чтобы помочь нам проанализировать некоторые данные об использовании на одном из наших рабочих серверов. Для выполнения анализа мне нужно запросить и суммировать 10 000 + сущностей, извлеченных из хранилища данных. Вычисление не сложно, это просто гистограмма элементов, которые проходят определенный фильтр образцов использования. Проблема, с которой я столкнулся, заключается в том, что я не могу получить данные из хранилища данных достаточно быстро, чтобы выполнить любую обработку раньше попадание в крайний срок запроса.

Я пробовал все, что я могу придумать, чтобы разбить запрос на параллельные вызовы RPC для повышения производительности, но в соответствии с appstats я не могу заставить запросы фактически выполняться параллельно. Независимо от того, какой метод я пытаюсь (см. ниже), всегда кажется, что RPC возвращается к водопаду последовательных следующих запросов.

Примечание: запрос и анализ кода работает, он просто работает медленно, потому что я не могу получить данные достаточно быстро от хранилище данных.

фон

у меня нет живой версии, которую я могу поделиться, но вот базовая модель для той части системы, о которой я говорю:

class Session(ndb.Model):
   """ A tracked user session. (customer account (company), version, OS, etc) """
   data = ndb.JsonProperty(required = False, indexed = False)

class Sample(ndb.Model):
   name      = ndb.StringProperty  (required = True,  indexed = True)
   session   = ndb.KeyProperty     (required = True,  kind = Session)
   timestamp = ndb.DateTimeProperty(required = True,  indexed = True)
   tags      = ndb.StringProperty  (repeated = True,  indexed = True)

вы можете думать о образцах как раз, когда пользователь использует возможности данного имени. (например: 'система.feature_x'). Теги основаны на данных клиента, системной информации и функции. например: ['winxp', '2.5.1', 'systemA', 'feature_x', 'premium_account']). Итак, теги формируют денормализованный набор маркеров, которые могут быть использованы для поиска образцов, представляющих интерес.

анализ, который я пытаюсь сделать, состоит в том, чтобы взять диапазон дат и спросить, сколько раз была функция набора функций (возможно, все функции), используемых в день (или в час) на счет клиента (компания, а не на пользователя).

так что вход в обработчик будет что-то вроде:

  • Дата Начала
  • Дата Окончания
  • тега(ов)

выход было бы:

[{
   'company_account': <string>,
   'counts': [
      {'timeperiod': <iso8601 date>, 'count': <int>}, ...
   ]
 }, ...
]

общий код для запросов

вот некоторый код, общий для всех запросов. Общая структура обработчика-это простой обработчик get с использованием webapp2, который устанавливает параметры запроса, запускает запрос, обрабатывает результаты, создает данные для возврата.

# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500   # Bring in large groups of entities

q = Sample.query()
q = q.order(Sample.timestamp)

# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))

def handle_sample(sample):
   session_obj = sample.session.get()    # Usually found in local or memcache thanks to ndb
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)

Методы Пробовал

Я пробовал различные методы, чтобы попытаться извлечь данные из хранилища данных как можно быстро и параллельно. Методы, которые я пробовал до сих пор включают в себя:

A. Одна Итерация

это более простой базовый вариант для сравнения с другими методами. Я просто строю запрос и повторяю все элементы, позволяя ndb делать то, что он делает, чтобы вытащить их один за другим.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)

for sample in q_iter:
   handle_sample(sample)

B. Большая Выборка

идея здесь заключалась в том, чтобы посмотреть, смогу ли я сделать одну очень большую выборку.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)

for sample in samples:
   handle_sample(sample)

C. асинхронная выборка во временном диапазоне

идея в том, чтобы признайте, что образцы довольно хорошо разнесены по времени, поэтому я могу создать набор независимых запросов, которые разделяют общую область времени на куски и пытаются запустить каждый из них параллельно с помощью async:

# split up timestamp space into 20 equal parts and async query each of them
ts_delta       = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []

for x in range(ts_intervals):
   cur_end_time = (cur_start_time + ts_delta)
   if x == (ts_intervals-1):    # Last one has to cover full range
      cur_end_time = end_time

   f = q.filter(Sample.timestamp >= cur_start_time,
                Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
   q_futures.append(f)
   cur_start_time = cur_end_time

# Now loop through and collect results
for f in q_futures:
   samples = f.get_result()
   for sample in samples:
      handle_sample(sample)

D. асинхронное отображение

я попробовал этот метод, потому что документация заставила его звучать так, как будто ndb может использовать некоторый параллелизм автоматически при использовании запроса.метод map_async.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)

@ndb.tasklet
def process_sample(sample):
   period_ts   = getPeriodTimestamp(sample.timestamp)
   session_obj = yield sample.session.get_async()    # Lookup the session object from cache
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)
   raise ndb.Return(None)

q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()

исход

Я проверил один пример запроса для сбора общего времени отклика и трассировки appstats. Результаты таковы:

A. Одна Итерация

real: 15.645 s

это происходит последовательно через выборку пакетов один за другим, а затем извлекает каждый сеанс из memcache.

B. Большая Выборка

real: 12.12 s

эффективно то же самое, что и вариант A, но по какой-то причине немного быстрее.

C. Асинхронная выборка во временном диапазоне

real: 15.251 s

кажется, что обеспечивает больше параллелизма в начале, но, похоже, замедляется последовательностью вызовов next во время итерации результатов. Также, похоже, не может перекрывать поисковые запросы memcache сеанса с ожидающими запросами.

D. асинхронное отображение

real: 13.752 s

это один из самых трудных для меня, чтобы понять. Похоже, это вопрос хороший дело в перекрытии, но все, кажется, растягивается в водопад, а не параллельно.

рекомендации

основываясь на всем этом, что я упускаю? Я просто нажимаю ограничение на движок приложений или есть лучший способ снять большое количество объектов параллельно?

Я в недоумении, что делать дальше. Я думал о переписывании клиента, чтобы сделать несколько запросов к движку приложений параллельно, но это кажется довольно грубым сила. Я бы действительно ожидал, что App engine должен быть в состоянии справиться с этим случаем использования, поэтому я предполагаю, что мне чего-то не хватает.

обновление

в конце концов я обнаружил, что параметр C был лучшим для моего случая. Я смог оптимизировать его, чтобы завершить за 6,1 секунды. Еще не идеально, но намного лучше.

после получения совета от нескольких человек, я обнаружил, что следующие пункты были ключевыми, чтобы понять и иметь в виду:

  • несколько запросы могут выполняться параллельно
  • только 10 RPC могут быть в полете сразу
  • попробуйте денормализовать до такой степени, что нет никаких вторичных запросов
  • этот тип задачи лучше оставить для отображения сокращения и очередей задач, а не запросов в реальном времени

Итак, что я сделал, чтобы сделать это быстрее:

  • я разделил пространство запроса с самого начала на основе времени. (Примечание: чем более равны разделы с точки зрения сущностей вернулся, тем лучше)
  • я еще денормализованные данные, чтобы избавиться от необходимости вторичного запроса сеанса
  • я использовал асинхронные операции ndb и wait_any () для перекрытия запросов с обработкой

Я все еще не получаю производительность, которую я ожидал бы или хотел, но это выполнимо на данный момент. Я просто хочу, чтобы их был лучший способ быстро вытащить большое количество последовательных объектов в память в обработчиках.

4 62

4 ответа:

большая обработка, как это не должно быть сделано в запросе пользователя, который имеет ограничение по времени 60s. Вместо этого это должно быть сделано в контексте, который поддерживает длительные запросы. Элемент задач очередь поддерживает запросы до 10 минут, и (я считаю) нормальные ограничения памяти (экземпляры F1, по умолчанию, имеют 128 МБ памяти). Для еще более высоких пределов (без тайм-аута запроса, 1 ГБ+ памяти) используйте backends.

вот что нужно попробовать: настроить a URL-адрес, который при доступе запускает задачу очереди задач. Он возвращает веб-страницу, которая опрашивает каждые ~5s на другой URL-адрес, который отвечает true / false, если задача очереди задач еще не завершена. Очередь задач обрабатывает данные, что может занять около 10 секунд, и сохраняет результат в хранилище данных либо в виде вычисленных данных, либо в виде отрисованной веб-страницы. Как только начальная страница обнаруживает, что она завершена, пользователь перенаправляется на страницу, которая извлекает теперь вычисленные результаты из хранилище данных.

новый экспериментальный Обработка Данных функция (API AppEngine для MapReduce) выглядит очень подходящей для решения этой проблемы. Он выполняет автоматическое сегментирование для выполнения нескольких параллельных рабочих процессов.

Большие операции с данными на движке приложений лучше всего реализовать с помощью какой-то операции mapreduce.

вот видео, описывающее процесс, но в том числе BigQuery https://developers.google.com/events/io/sessions/gooio2012/307/

не похоже, что вам нужен BigQuery, но вы, вероятно, хотите использовать как карту, так и уменьшить части конвейера.

основное различие между тем, что вы делаете, и ситуацией mapreduce заключается в том, что вы запускаете один экземпляр и повторяете запросы, где на mapreduce у вас будет отдельный экземпляр, работающий параллельно для каждого запроса. Вам понадобится операция сокращения ,чтобы" суммировать " все данные и записать результат где-то.

другая проблема заключается в том, что вы должны использовать курсоры для итерации. https://developers.google.com/appengine/docs/java/datastore/queries#Query_Cursors

Если итератор использует запрос смещение, это будет неэффективно, так как смещение выдает тот же запрос, пропускает ряд результатов и дает вам следующий набор, в то время как курсор переходит прямо к следующему набору.

у меня есть аналогичная проблема, и после работы с поддержкой Google в течение нескольких недель я могу подтвердить, что нет никакого волшебного решения, по крайней мере, по состоянию на декабрь 2017 года.

tl; dr: можно ожидать пропускную способность от 220 entities / second для стандартного SDK, работающего на экземпляре B1 до 900 entities / second для исправленного SDK, работающего на экземпляре B8.

ограничение связано с ЦП и изменение экземпляра типа напрямую влияет спектакль. Это подтверждается аналогичными результатами, полученными на экземплярах B4 и B4_1G

лучшая пропускная способность, которую я получил для объекта Expando с примерно 30 полями:

стандартный GAE SDK

  • B1 экземпляр: ~220 объектов / сек
  • экземпляр B2: ~250 сущностей в секунду
  • B4 экземпляр: ~560 объектов / сек
  • b4_1g экземпляр: ~560 объектов / сек
  • экземпляр B8: ~650 сущности / секунда

исправлено GAE SDK

  • B1 экземпляр: ~420 объектов / сек
  • экземпляр B8: ~900 сущностей в секунду

для стандартного GAE SDK я пробовал различные подходы, включая многопоточность, но лучшим оказался fetch_async С wait_any. Текущая библиотека NDB уже отлично справляется с использованием async и futures под капотом, поэтому любая попытка подтолкнуть это только с помощью потоков сделать еще хуже.

Я нашел два интересных подхода для оптимизации этого:

Мэтт Фаус очень хорошо объясняет проблему:

GAE SDK предоставляет API для чтения и записи объектов, полученных из ваши классы в хранилище данных. Это экономит вам скучную работу проверка необработанных данных, возвращенных из хранилища данных, и их переупаковка в простой в использовании объект. В частности, GAE использует буферы протокола для передачи необработанных данных из хранилища на интерфейсную машину, которая нуждается оно. Затем SDK отвечает за декодирование этого формата и возврат чистый объект для вашего кода. Эта утилита отличная, но иногда она делает немного больше работы, чем вы хотели бы. [... Используя наш профилирования инструмент, я обнаружил, что полностью 50% о времени, потраченном на их получение сущности были во время фазы декодирования protobuf-to-python-object. Этот означает, что процессор на интерфейсном сервере был узким местом в этих хранилище данных читает!

GAE-data-access-web-request

оба подхода пытаются сократить время, затрачиваемое на декодирование protobuf в Python, уменьшая количество декодированных полей.

Я пробовал оба подхода, но мне удалось только с matt's. SDK internals изменилось с тех пор, как Эван опубликовал его решение. Мне пришлось немного изменить код, опубликованный Мэттом здесь, но это было довольно легко - если есть интерес я могу опубликовать окончательный код.

для обычного объекта Expando с примерно 30 полями я использовал решение Мэтта для декодирования только пары полей и получил значительное улучшение.

В заключение нужно планировать соответственно и не ожидать, что удастся обработать гораздо больше, чем несколько сотен объектов в запросе GAE "в реальном времени".