Рабочие потоки "больших данных" с использованием панд


Я пытался разгадать ответ на этот вопрос в течение многих месяцев, изучая панд. Я использую SAS для своей повседневной работы, и это отлично подходит для его поддержки вне ядра. Однако SAS ужасен как часть программного обеспечения по многим другим причинам.

однажды я надеюсь заменить свое использование SAS на python и pandas, но в настоящее время мне не хватает внеядерного рабочего процесса для больших наборов данных. Я не говорю о "больших данных", которые требуют распределенной сети, а скорее файлы слишком большой, чтобы поместиться в памяти, но достаточно маленький, чтобы поместиться на жестком диске.

моя первая мысль-использовать HDFStore чтобы держать большие наборы данных на диске и тянуть только те части, которые мне нужны в dataframes для анализа. Другие упоминали MongoDB как более простую в использовании альтернативу. Мой вопрос таков:

Каковы некоторые наиболее эффективные рабочие процессы для выполнения следующих задач:

  1. загрузка плоских файлов в постоянную базу данных на диске структура
  2. запрос этой базы данных для получения данных для подачи в структуру данных pandas
  3. обновление базы данных после манипулирования кусками в панд

реальные примеры были бы очень признательны, особенно от тех, кто использует панд на "больших данных".

Edit -- пример того, как я хотел бы, чтобы это работало:

  1. итеративно импортировать большой плоский файл и хранить его в постоянной базе данных на диске структура. Эти файлы, как правило, слишком велики, чтобы поместиться в памяти.
  2. чтобы использовать Pandas, я хотел бы прочитать подмножества этих данных (обычно всего несколько столбцов за раз), которые могут поместиться в памяти.
  3. Я бы создал новые столбцы, выполнив различные операции над выбранными столбцами.
  4. затем мне придется добавить эти новые столбцы в структуру базы данных.

Я пытаюсь найти лучший способ выполнения этих лестница. Чтение ссылок о панд и pytables кажется, что добавление нового столбца может быть проблемой.

Edit -- отвечая на вопросы Джеффа конкретно:

  1. Я строю модели риска потребительского кредита. Виды данных включают характеристики телефона, SSN и адреса; стоимость имущества; уничижительную информацию, такую как уголовные дела, банкротства и т. д... Наборы данных, которые я использую каждый день, имеют в среднем от 1000 до 2000 полей смешанных типов данных: непрерывные, номинальные и порядковые переменные как числовых, так и символьных данных. Я редко добавляю строки, но я выполняю много операций, которые создают новые столбцы.
  2. типичные операции включают объединение нескольких столбцов с использованием условной логики в новый составной столбец. Например, if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B'. Результатом этих операций является новый столбец для каждой записи в наборе данных.
  3. наконец, я хотел бы добавить эти новые столбцы в структуру данных на диске. Я бы повторите шаг 2, исследуя данные с помощью перекрестных таблиц и описательной статистики, пытаясь найти интересные, интуитивно понятные отношения к модели.
  4. стандартный файл проекта обычно составляет около 1 ГБ. Файлы организованы таким образом, что строка состоит из записи потребительских данных. Каждая строка имеет одинаковое количество столбцов для каждой записи. Так будет всегда.
  5. это довольно редко, что я бы подмножество строк при создании нового столбца. Тем не менее, это довольно обычно для меня подмножество строк при создании отчетов или создании описательной статистики. Например, я мог бы создать простую частоту для определенной линии бизнеса, скажем, розничных кредитных карт. Для этого я бы выбрал только те записи, где линия бизнеса = розничная торговля в дополнение к тем столбцам, о которых я хочу сообщить. Однако при создании новых столбцов я бы вытащил все строки данных и только те столбцы, которые мне нужны для операций.
  6. процесс моделирования требуется, чтобы я анализировал каждый столбец, искал интересные отношения с некоторой переменной результата и создавал новые составные столбцы, которые описывают эти отношения. Столбцы, которые я исследую, обычно делаются в небольших наборах. Например, я сосредоточусь на наборе, скажем, 20 столбцов, которые просто касаются значений свойств и наблюдают, как они относятся к дефолту по кредиту. Как только они будут изучены и будут созданы новые столбцы, я перейду к другой группе столбцов, скажем, к колледжскому образованию, и повторить процесс. То, что я делаю, - это создание переменных-кандидатов, которые объясняют связь между моими данными и некоторым результатом. В самом конце этого процесса я применяю некоторые методы обучения, которые создают уравнение из этих составных столбцов.

Я редко добавляю строки в набор данных. Я почти всегда буду создавать новые столбцы (переменные или функции в статистике/машинном обучении).

13 757

13 ответов:

Я обычно использую десятки гигабайт данных именно таким образом например, у меня есть таблицы на диске, которые я читаю через запросы, создаю данные и добавляю обратно.

это стоит прочитать документы и поздно в этой теме несколько советов о том, как хранить ваши данные.

детали, которые будут влиять на то, как вы храните свои данные, например:
дать как много деталь как вы можете, и я могу помочь вам разработать структура.

  1. размер данных, # строк, столбцов, типов столбцов; вы добавляете строки или просто столбцы?
  2. как будут выглядеть типичные операции. Например, сделайте запрос на столбцы, чтобы выбрать кучу строк и определенных столбцов, затем выполните операцию (в памяти), создайте новые столбцы, сохраните их.
    (Приведение игрушечного примера может позволить нам предложить более конкретные рекомендации.)
  3. после этой обработки, то что вы делаете? Это шаг 2 специальный или повторяемый?
  4. входные плоские файлы: сколько, грубый общий размер в ГБ. Как они организованы, например, по записям? Содержит ли каждый из них разные поля, или у них есть несколько записей в файле со всеми полями в каждом файле?
  5. вы когда-нибудь выбирали подмножества строк (записей) на основе критериев (например, выберите строки С полем A > 5)? а затем что-то сделать, или вы просто выбираете поля A, B, C со всеми записями (а затем делаете что-то)?
  6. вы "работаете" над всеми своими столбцами (в группах), или есть хорошая пропорция, которую вы можете использовать только для отчетов (например, вы хотите сохранить данные вокруг, но не нужно тянуть в этом столбце эксплицитность до окончательного времени результатов)?

решение

убедитесь, что у вас есть панды по крайней мере 0.10.1 установлен.

читать переборе файлов, кусок-на-кусок и несколько табличные запросы.

поскольку pytables оптимизирован для работы по строкам (это то, что вы запрашиваете), мы создадим таблицу для каждой группы полей. Таким образом, легко выбрать небольшую группу полей (которая будет работать с большой таблицей, но это более эффективно сделать таким образом... Я думаю, что смогу исправить это ограничение в будущем... это более интуитивно так или иначе):
(Ниже приведен псевдокод.)

import numpy as np
import pandas as pd

# create a store
store = pd.HDFStore('mystore.h5')

# this is the key to your storage:
#    this maps your fields to a specific group, and defines 
#    what you want to have as data_columns.
#    you might want to create a nice class wrapping this
#    (as you will want to have this map and its inversion)  
group_map = dict(
    A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
    B = dict(fields = ['field_10',......        ], dc = ['field_10']),
    .....
    REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),

)

group_map_inverted = dict()
for g, v in group_map.items():
    group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))

чтение в файлах и создание хранение (по существу делает то, что append_to_multiple нет):

for f in files:
   # read in the file, additional options hmay be necessary here
   # the chunksize is not strictly necessary, you may be able to slurp each 
   # file into memory in which case just eliminate this part of the loop 
   # (you can also change chunksize if necessary)
   for chunk in pd.read_table(f, chunksize=50000):
       # we are going to append to each table by group
       # we are not going to create indexes at this time
       # but we *ARE* going to create (some) data_columns

       # figure out the field groupings
       for g, v in group_map.items():
             # create the frame for this group
             frame = chunk.reindex(columns = v['fields'], copy = False)    

             # append it
             store.append(g, frame, index=False, data_columns = v['dc'])

теперь у вас есть все таблицы в файле (на самом деле вы можете хранить их в отдельных файлах, если хотите, вам придется добавить имя файла в group_map, но, вероятно, это не обязательно).

вот как вы получаете столбцы и создаете новые:

frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
#     select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows

# do calculations on this frame
new_frame = cool_function_on_frame(frame)

# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)

когда вы будете готовы к post_processing:

# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)

о data_columns, вам на самом деле не нужно определять любой data_columns; они позволяют вам выбирать строки на основе столбца. Например, что-то вроде:

store.select(group, where = ['field_1000=foo', 'field_1001>0'])

они могут быть наиболее интересны для вас на заключительном этапе создания отчета (по существу, столбец данных отделен от других столбцов, что может несколько повлиять на эффективность, если вы определяете много).

вы также можете захотеть:

  • создайте функцию, которая берет список полей, ищет группы в groups_map, затем выбирает их и объединяет результаты, чтобы вы получили результирующий кадр (это по существу то, что делает select_as_multiple). таким образом, структура будет очень прозрачным.
  • индексы на некоторых столбцах данных (делает подмножество строк намного быстрее).
  • включить сжатие.

Дай мне знать, когда у вас есть вопросы!

Я думаю, что ответы выше отсутствуют простой подход, который я нашел очень полезным.

когда у меня есть файл, который слишком велик для загрузки в память, я разбиваю файл на несколько файлов меньшего размера (либо по строке, либо по cols)

пример: в случае 30-дневного объема торговых данных размером ~30 Гб я разбиваю его на файл в день размером ~1 ГБ. Впоследствии я обрабатываю каждый файл отдельно и агрегирую результаты в конце

одно из самых больших преимуществ что он позволяет параллельную обработку файлов (либо несколько потоков или процессов)

другое преимущество заключается в том, что манипуляции с файлами (например, добавление / удаление дат в примере) могут выполняться обычными командами оболочки, что невозможно в более продвинутых/сложных форматах файлов

этот подход не охватывает все сценарии, но очень полезен во многих из них

Если ваши наборы данных находятся между 1 и 20 ГБ, вы должны получить рабочую станцию с 48 ГБ оперативной памяти. Тогда панды могут держать весь набор данных в оперативной памяти. Я знаю, что это не тот ответ, который вы ищете здесь, но делать научные вычисления на ноутбуке с 4 ГБ ОЗУ не разумно.

теперь, через два года после вопроса, есть эквивалент "вне ядра" панды:ДАСК. Это превосходно! Хотя он не поддерживает все функции pandas, вы можете получить действительно далеко с ним.

Я знаю, что это старый нить, но я думаю Blaze библиотека стоит проверить. Он построен для таких ситуаций.

документы:

Blaze расширяет удобство использования NumPy и Pandas для распределенных и внеядерных вычислений. Blaze предоставляет интерфейс, аналогичный интерфейсу NumPy ND-Array или Pandas DataFrame, но отображает эти знакомые интерфейсы на множество других вычислительных механизмов, таких как Postgres или Искра.

Edit: кстати, его поддерживают ContinuumIO и Трэвис Олифант, автор NumPy.

это дело для pymongo. Я также прототипировал с помощью sql server, sqlite, HDF, ORM (SQLAlchemy) в python. В первую очередь pymongo-это БД на основе документов, поэтому каждый человек будет документом (dict атрибуты). Многие люди формируют коллекцию, и у вас может быть много коллекций (люди, фондовый рынок, доход).

pd.dateframe - > pymongo Примечание: я использую chunksize на read_csv чтобы сохранить его до 5 до 10k записей(pymongo сбрасывает сокет, если крупнее)

aCollection.insert((a[1].to_dict() for a in df.iterrows()))

запрос: gt = больше, чем...

pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))

.find() возвращает итератор, поэтому я обычно использую ichunked нарезать на более мелкие итераторы.

как насчет соединения, так как я обычно получаю 10 источников данных для вставки вместе:

aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))

затем (в моем случае иногда приходится агг на aJoinDF сначала перед его "слиянием".)

df = pandas.merge(df, aJoinDF, on=aKey, how='left')

и затем вы можете записать новую информацию в свою основную коллекцию через обновление метод ниже. (логическая коллекция против физических источников данных).

collection.update({primarykey:foo},{key:change})

при меньшем поиске просто денормализуйте. Например, у вас есть код в документе, и вы просто добавляете текст кода поля и делаете dict поиск при создании документов.

теперь у вас есть хороший набор данных, основанный на человеке, вы можете раскрыть свою логику в каждом случае и сделать больше атрибутов. Наконец, вы можете прочитать в панд ваши 3 в память Макс ключевых показателей и сделать повороты/agg/исследования данных. Это работает для меня для 3 миллионов записей с номерами / большим текстом/категориями/кодами/поплавками/...

вы также можете использовать два метода, встроенные в MongoDB (MapReduce и aggregate framework). смотрите здесь для получения дополнительной информации о aggregate framework, так как это кажется проще, чем MapReduce и выглядит удобно для быстрой агрегатной работы. Обратите внимание, что мне не нужно было определять мои поля или отношения, и я могу добавлять элементы в документ. В текущем состоянии быстро меняется numpy, панды, python toolset, MongoDB помогает мне просто приступить к работе:)

Я заметил это немного поздно, но я работаю с аналогичной проблемой (модели предоплаты ипотеки). Мое решение состояло в том, чтобы пропустить слой pandas HDFStore и использовать прямые pytables. Я сохраняю каждый столбец как отдельный массив HDF5 в моем окончательном файле.

мой основной рабочий процесс, чтобы сначала получить CSV-файл из базы данных. Я gzip его, так что это не так огромно. Затем я конвертирую это в ориентированный на строку файл HDF5, повторяя его в python, Преобразуя каждую строку в реальный тип данных и записывая это к файлу HDF5. Это занимает несколько десятков минут, но он не использует никакой памяти, так как он работает только строка за строкой. Потом "перенести" с построчным HDF5 файл в столбец-ориентированной HDF5 файл.

таблица транспонирования выглядит так:

def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
    # Get a reference to the input data.
    tb = h_in.getNode(table_path)
    # Create the output group to hold the columns.
    grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
    for col_name in tb.colnames:
        logger.debug("Processing %s", col_name)
        # Get the data.
        col_data = tb.col(col_name)
        # Create the output array.
        arr = h_out.createCArray(grp,
                                 col_name,
                                 tables.Atom.from_dtype(col_data.dtype),
                                 col_data.shape)
        # Store the data.
        arr[:] = col_data
    h_out.flush()

чтение его обратно в то выглядит так:

def read_hdf5(hdf5_path, group_path="/data", columns=None):
    """Read a transposed data set from a HDF5 file."""
    if isinstance(hdf5_path, tables.file.File):
        hf = hdf5_path
    else:
        hf = tables.openFile(hdf5_path)

    grp = hf.getNode(group_path)
    if columns is None:
        data = [(child.name, child[:]) for child in grp]
    else:
        data = [(child.name, child[:]) for child in grp if child.name in columns]

    # Convert any float32 columns to float64 for processing.
    for i in range(len(data)):
        name, vec = data[i]
        if vec.dtype == np.float32:
            data[i] = (name, vec.astype(np.float64))

    if not isinstance(hdf5_path, tables.file.File):
        hf.close()
    return pd.DataFrame.from_items(data)

теперь я обычно запускаю это на машине с тонной памяти, поэтому я не могу быть достаточно осторожным с использованием памяти. Например, по умолчанию операция загрузки считывает весь набор данных.

это обычно работает для меня, но это немного неуклюже, и я не могу использовать причудливую магию pytables.

Edit: реальное преимущество этого подхода, по сравнению с массивом записей pytables по умолчанию, заключается в том, что я могу загрузить данные в R с помощью h5r, который не может обрабатывать таблицы. Или, по крайней мере, я не смог заставить его загружать разнородные таблицы.

еще одна вариация

многие операции, выполняемые в pandas, также могут быть выполнены как запрос БД (sql, mongo)

использование СУБД или mongodb позволяет выполнять некоторые агрегации в запросе БД (который оптимизирован для больших данных и эффективно использует кэш и индексы)

позже вы можете выполнить постобработку с помощью панд.

преимущество этого метода заключается в том, что вы получаете оптимизацию БД для работы с большими данными, в то время как все еще определяя логику в декларативном синтаксисе высокого уровня - и не имея дела с деталями решения, что делать в памяти и что делать вне ядра.

и хотя язык запросов и панды отличаются, обычно не сложно перевести часть логики от одного к другому.

один трюк, который я нашел полезным для случаев использования" больших данных",-это уменьшить объем данных, уменьшив точность float до 32-бит. Это применимо не во всех случаях, но во многих приложениях 64-битная точность является излишней, и экономия памяти 2x стоит того. Чтобы сделать очевидный момент еще более очевидным:

>>> df = pd.DataFrame(np.random.randn(int(1e8), 5))
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float64(5)
memory usage: 3.7 GB

>>> df.astype(np.float32).info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float32(5)
memory usage: 1.9 GB

как отмечают другие, через несколько лет появился эквивалент "вне ядра" панд:ДАСК. Хотя dask не является выпадающей заменой панд и вся его функциональность выделяется по нескольким причинам:

Dask-это гибкая параллельная вычислительная библиотека для аналитических вычислений, оптимизированная для динамического планирования задач для интерактивных вычислительных нагрузок Коллекции "больших данных", такие как параллельные массивы, фреймы данных и списки, расширяющие общие интерфейсы, такие как итераторы NumPy, Pandas или Python для больших, чем память, или распределенных сред и масштабируются от ноутбуков до кластеров.

ДАСК подчеркивает следующие достоинства:

  • знакомый: предоставляет распараллеленный массив NumPy и объекты DataFrame Pandas
  • гибкий: обеспечивает интерфейс планирования задач для более пользовательских рабочих нагрузок и интеграции с другими проектами.
  • родной: включает распределенный вычисления в чистом Python с доступом к стеку PyData.
  • Fast: работает с низкими накладными расходами, низкой задержкой и минимальной сериализацией, необходимой для быстрых численных алгоритмов
  • масштабирование: работает упруго на кластерах с 1000 ядер масштабирование вниз: тривиально настроить и запустить на ноутбуке в одном процессе
  • отзывчивый: разработанный с учетом интерактивных вычислений он обеспечивает быструю обратную связь и диагностику, чтобы помочь люди

и добавить простой пример кода:

import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()

заменяет некоторые панды такой код:

import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()

и, что особенно примечательно, обеспечивает через параллельный.фьючерсы интерфейс, общий для представления пользовательских задач:

from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()

рассмотрим Ruffus Если вы идете по простому пути создания конвейера данных, который разбит на несколько небольших файлов.

здесь стоит упомянуть Рэй также,
это распределенная вычислительная структура, которая имеет свою собственную реализацию для панд распределенным способом.

просто замените импорт панд, и код должен работать как есть:

# import pandas as pd
import ray.dataframe as pd

#use pd as usual

можно прочитать более подробную информацию здесь:

https://rise.cs.berkeley.edu/blog/pandas-on-ray/

недавно я столкнулся с подобной проблемой. Я обнаружил, что просто читаю данные в кусках и добавляю их, когда я пишу их в кусках в тот же csv, хорошо работает. Моя проблема заключалась в добавлении столбца даты на основе информации в другой таблице, используя значение определенных столбцов следующим образом. Это может помочь тем, кто смущен dask и hdf5, но более знаком с пандами, такими как я.

def addDateColumn():
"""Adds time to the daily rainfall data. Reads the csv as chunks of 100k 
   rows at a time and outputs them, appending as needed, to a single csv. 
   Uses the column of the raster names to get the date.
"""
    df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True, 
                     chunksize=100000) #read csv file as 100k chunks

    '''Do some stuff'''

    count = 1 #for indexing item in time list 
    for chunk in df: #for each 100k rows
        newtime = [] #empty list to append repeating times for different rows
        toiterate = chunk[chunk.columns[2]] #ID of raster nums to base time
        while count <= toiterate.max():
            for i in toiterate: 
                if i ==count:
                    newtime.append(newyears[count])
            count+=1
        print "Finished", str(chunknum), "chunks"
        chunk["time"] = newtime #create new column in dataframe based on time
        outname = "CHIRPS_tanz_time2.csv"
        #append each output to same csv, using no header
        chunk.to_csv(pathlist[2]+outname, mode='a', header=None, index=None)