Как распечатать содержимое RDD?
Я пытаюсь распечатать содержимое коллекции на консоль Spark.
у меня есть типа:
linesWithSessionId: org.apache.spark.rdd.RDD[String] = FilteredRDD[3]
и я использую команду:
scala> linesWithSessionId.map(line => println(line))
но это напечатано :
res1: org.апаш.искра.РДУ.RDD[Unit] = MappedRDD[4] на карте в :19
Как я могу записать RDD на консоль или сохранить его на диск, чтобы я мог просматривать его содержимое?
8 ответов:
если вы хотите просмотреть содержимое RDD, один из способов-использовать
collect()
:myRDD.collect().foreach(println)
это не очень хорошая идея, хотя, когда RDD имеет миллиарды строк. Используйте
take()
чтобы взять только несколько, чтобы распечатать:myRDD.take(n).foreach(println)
Если вы используете это на кластере, то
println
не будет печатать обратно в контекст. Вы должны принестиRDD
данные в сессии. Для этого вы можете заставить его к локальному массиву, а затем распечатать его:linesWithSessionId.toArray().foreach(line => println(line))
вы можете обменять ваш
RDD
доDataFrame
затемshow()
его.// For implicit conversion from RDD to DataFrame import spark.implicits._ fruits = sc.parallelize([("apple", 1), ("banana", 2), ("orange", 17)]) // convert to DF then show it fruits.toDF().show()
Это покажет верхние 20 строк ваших данных, поэтому размер ваших данных не должен быть проблемой.
+------+---+ | _1| _2| +------+---+ | apple| 1| |banana| 2| |orange| 17| +------+---+
есть, вероятно, много архитектурных различий между
myRDD.foreach(println)
иmyRDD.collect().foreach(println)
(не только "собирать", но и другие действия). Одна из различий, которые я видел, это когда делатьmyRDD.foreach(println)
выход будет в случайном порядке. Например: если мой rdd поступает из текстового файла, где каждая строка имеет номер, выход будет иметь другой порядок. Но когда я это сделалmyRDD.collect().foreach(println)
, порядок остается таким же, как и текстовый файл.
в python
linesWithSessionIdCollect = linesWithSessionId.collect() linesWithSessionIdCollect
это распечатает все содержимое RDD
вместо того, чтобы печатать каждый раз, вы можете;
[1]создайте общий метод печати внутри оболочки Spark.
def p(rdd: org.apache.spark.rdd.RDD[_]) = rdd.foreach(println)
[2] или даже лучше, используя implicits, вы можете добавить функцию в класс RDD для печати ее содержимого.
implicit class Printer(rdd: org.apache.spark.rdd.RDD[_]) { def print = rdd.foreach(println) }
пример использования:
val rdd = sc.parallelize(List(1,2,3,4)).map(_*2) p(rdd) // 1 rdd.print // 2
выход:
2 6 4 8
PS. Это имеет смысл только если вы работаете в локальном режиме и с небольшим количеством данных. В противном случае, вы либо не сможете увидеть результаты на клиенте или не хватает памяти из-за большого результата набора данных.