Как распечатать содержимое RDD?


Я пытаюсь распечатать содержимое коллекции на консоль Spark.

у меня есть типа:

linesWithSessionId: org.apache.spark.rdd.RDD[String] = FilteredRDD[3]

и я использую команду:

scala> linesWithSessionId.map(line => println(line))

но это напечатано :

res1: org.апаш.искра.РДУ.RDD[Unit] = MappedRDD[4] на карте в :19

Как я могу записать RDD на консоль или сохранить его на диск, чтобы я мог просматривать его содержимое?

8 99

8 ответов:

если вы хотите просмотреть содержимое RDD, один из способов-использовать collect():

myRDD.collect().foreach(println)

это не очень хорошая идея, хотя, когда RDD имеет миллиарды строк. Используйте take() чтобы взять только несколько, чтобы распечатать:

myRDD.take(n).foreach(println)

The

Если вы используете это на кластере, то println не будет печатать обратно в контекст. Вы должны принести RDD данные в сессии. Для этого вы можете заставить его к локальному массиву, а затем распечатать его:

linesWithSessionId.toArray().foreach(line => println(line))

вы можете обменять ваш RDD до DataFrame затем show() его.

// For implicit conversion from RDD to DataFrame
import spark.implicits._

fruits = sc.parallelize([("apple", 1), ("banana", 2), ("orange", 17)])

// convert to DF then show it
fruits.toDF().show()

Это покажет верхние 20 строк ваших данных, поэтому размер ваших данных не должен быть проблемой.

+------+---+                                                                    
|    _1| _2|
+------+---+
| apple|  1|
|banana|  2|
|orange| 17|
+------+---+

есть, вероятно, много архитектурных различий между myRDD.foreach(println) и myRDD.collect().foreach(println) (не только "собирать", но и другие действия). Одна из различий, которые я видел, это когда делать myRDD.foreach(println) выход будет в случайном порядке. Например: если мой rdd поступает из текстового файла, где каждая строка имеет номер, выход будет иметь другой порядок. Но когда я это сделал myRDD.collect().foreach(println), порядок остается таким же, как и текстовый файл.

в python

   linesWithSessionIdCollect = linesWithSessionId.collect()
   linesWithSessionIdCollect

это распечатает все содержимое RDD

вы также можете сохранить в виде файла:rdd.saveAsTextFile("alicia.txt")

вместо того, чтобы печатать каждый раз, вы можете;

[1]создайте общий метод печати внутри оболочки Spark.

def p(rdd: org.apache.spark.rdd.RDD[_]) = rdd.foreach(println)

[2] или даже лучше, используя implicits, вы можете добавить функцию в класс RDD для печати ее содержимого.

implicit class Printer(rdd: org.apache.spark.rdd.RDD[_]) {
    def print = rdd.foreach(println)
}

пример использования:

val rdd = sc.parallelize(List(1,2,3,4)).map(_*2)

p(rdd) // 1
rdd.print // 2

выход:

2
6
4
8

PS. Это имеет смысл только если вы работаете в локальном режиме и с небольшим количеством данных. В противном случае, вы либо не сможете увидеть результаты на клиенте или не хватает памяти из-за большого результата набора данных.