Как записать полученный RDD в csv файл в Spark python


У меня есть результирующий RDD labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions). Это имеет выходные данные в следующем формате:

[(0.0, 0.08482142857142858), (0.0, 0.11442786069651742),.....]
Я хочу создать CSV-файл с одним столбцом для labels (Первая часть кортежа в приведенном выше выводе) и одним для predictions(вторая часть вывода кортежа). Но я не знаю, как писать в CSV-файл в Spark с помощью Python.

Как я могу создать CSV-файл с указанным выше выводом?

3 20

3 ответа:

Просто map строки RDD (labelsAndPredictions) в строки (строки CSV) затем используйте rdd.saveAsTextFile().

def toCSVLine(data):
  return ','.join(str(d) for d in data)

lines = labelsAndPredictions.map(toCSVLine)
lines.saveAsTextFile('hdfs://my-node:9000/tmp/labels-and-predictions.csv')

Я знаю, что это Старая Почта. Но чтобы помочь кому-то искать то же самое, вот как я пишу два столбца RDD в один CSV-файл в PySpark 1.6.2

РДД:

>>> rdd.take(5)
[(73342, u'cells'), (62861, u'cell'), (61714, u'studies'), (61377, u'aim'), (60168, u'clinical')]

Теперь код:

# First I convert the RDD to dataframe
from pyspark import SparkContext
df = sqlContext.createDataFrame(rdd, ['count', 'word'])

ДФ:

>>> df.show()
+-----+-----------+
|count|       word|
+-----+-----------+
|73342|      cells|
|62861|       cell|
|61714|    studies|
|61377|        aim|
|60168|   clinical|
|59275|          2|
|59221|          1|
|58274|       data|
|58087|development|
|56579|     cancer|
|50243|    disease|
|49817|   provided|
|49216|   specific|
|48857|     health|
|48536|      study|
|47827|    project|
|45573|description|
|45455|  applicant|
|44739|    program|
|44522|   patients|
+-----+-----------+
only showing top 20 rows

Теперь напишите в CSV

# Write CSV (I have HDFS storage)
df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('file:///home/username/csv_out')

P. S: Я всего лишь новичок, изучающий сообщения здесь, в Stackoverflow. Так что я не знаю, является ли это лучшим способом. Но это сработало для меня, и я надеюсь, что это поможет кому-то!

Нехорошо просто соединять запятыми, потому что если поля содержат запятые, они не будут правильно цитироваться, например ','.join(['a', 'b', '1,2,3', 'c']) дает вам a,b,1,2,3,c, Когда вы хотите a,b,"1,2,3",c. Вместо этого вы должны использовать модуль csv Python для преобразования каждого списка в RDD в правильно отформатированную строку csv:

# python 3
import csv, io

def list_to_csv_str(x):
    """Given a list of strings, returns a properly-csv-formatted string."""
    output = io.StringIO("")
    csv.writer(output).writerow(x)
    return output.getvalue().strip() # remove extra newline

# ... do stuff with your rdd ...
rdd = rdd.map(list_to_csv_str)
rdd.saveAsTextFile("output_directory")

Поскольку модуль csv записывает только в файловые объекты, мы должны создать пустой "файл" с io.StringIO("") и сообщить csv.writer для записи в него строки в формате csv. Затем мы используем output.getvalue(), чтобы получить строку, которую мы просто написал в "файл". Чтобы этот код работал с Python 2, просто замените io модулем StringIO.

Если вы используете API Spark DataFrames, вы также можете заглянуть в функцию DataBricks save, которая имеет формат csv.