Как записать полученный RDD в csv файл в Spark python
У меня есть результирующий RDD labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
. Это имеет выходные данные в следующем формате:
[(0.0, 0.08482142857142858), (0.0, 0.11442786069651742),.....]
Я хочу создать CSV-файл с одним столбцом для labels
(Первая часть кортежа в приведенном выше выводе) и одним для predictions
(вторая часть вывода кортежа). Но я не знаю, как писать в CSV-файл в Spark с помощью Python.
Как я могу создать CSV-файл с указанным выше выводом?
3 ответа:
Просто
map
строки RDD (labelsAndPredictions
) в строки (строки CSV) затем используйтеrdd.saveAsTextFile()
.def toCSVLine(data): return ','.join(str(d) for d in data) lines = labelsAndPredictions.map(toCSVLine) lines.saveAsTextFile('hdfs://my-node:9000/tmp/labels-and-predictions.csv')
Я знаю, что это Старая Почта. Но чтобы помочь кому-то искать то же самое, вот как я пишу два столбца RDD в один CSV-файл в PySpark 1.6.2
РДД:
>>> rdd.take(5) [(73342, u'cells'), (62861, u'cell'), (61714, u'studies'), (61377, u'aim'), (60168, u'clinical')]
Теперь код:
# First I convert the RDD to dataframe from pyspark import SparkContext df = sqlContext.createDataFrame(rdd, ['count', 'word'])
ДФ:
>>> df.show() +-----+-----------+ |count| word| +-----+-----------+ |73342| cells| |62861| cell| |61714| studies| |61377| aim| |60168| clinical| |59275| 2| |59221| 1| |58274| data| |58087|development| |56579| cancer| |50243| disease| |49817| provided| |49216| specific| |48857| health| |48536| study| |47827| project| |45573|description| |45455| applicant| |44739| program| |44522| patients| +-----+-----------+ only showing top 20 rows
Теперь напишите в CSV
# Write CSV (I have HDFS storage) df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('file:///home/username/csv_out')
P. S: Я всего лишь новичок, изучающий сообщения здесь, в Stackoverflow. Так что я не знаю, является ли это лучшим способом. Но это сработало для меня, и я надеюсь, что это поможет кому-то!
Нехорошо просто соединять запятыми, потому что если поля содержат запятые, они не будут правильно цитироваться, например
','.join(['a', 'b', '1,2,3', 'c'])
дает вамa,b,1,2,3,c
, Когда вы хотитеa,b,"1,2,3",c
. Вместо этого вы должны использовать модуль csv Python для преобразования каждого списка в RDD в правильно отформатированную строку csv:# python 3 import csv, io def list_to_csv_str(x): """Given a list of strings, returns a properly-csv-formatted string.""" output = io.StringIO("") csv.writer(output).writerow(x) return output.getvalue().strip() # remove extra newline # ... do stuff with your rdd ... rdd = rdd.map(list_to_csv_str) rdd.saveAsTextFile("output_directory")
Поскольку модуль csv записывает только в файловые объекты, мы должны создать пустой "файл" с
io.StringIO("")
и сообщить csv.writer для записи в него строки в формате csv. Затем мы используемoutput.getvalue()
, чтобы получить строку, которую мы просто написал в "файл". Чтобы этот код работал с Python 2, просто замените io модулем StringIO.Если вы используете API Spark DataFrames, вы также можете заглянуть в функцию DataBricks save, которая имеет формат csv.