Как читать вложенную коллекцию в Spark


У меня есть паркетный стол с одной из колонн

, array >

Может выполнять запросы к этой таблице в Hive, используя синтаксис бокового вида.

Как прочитать эту таблицу в RDD, и что более важно, как фильтровать, сопоставлять и т. д. эту вложенную коллекцию в Spark?

Не удалось найти никаких ссылок на это в документации Spark. Заранее спасибо за любую информацию!

ПС. Было бы полезно привести некоторые статистические данные о таблица. Количество колонок в основной таблице ~600. Количество рядов ~200м. Количество "колонок" во вложенной коллекции ~10. Среднее число записей во вложенной коллекции ~35.

4 17

4 ответа:

Нет никакой магии в случае вложенной коллекции. Spark будет обрабатывать таким же образом a RDD[(String, String)] и a RDD[(String, Seq[String])].

Чтение такой вложенной коллекции из файлов Parquet может быть сложным, хотя.

Давайте возьмем пример из spark-shell (1.3.1):

scala> import sqlContext.implicits._
import sqlContext.implicits._

scala> case class Inner(a: String, b: String)
defined class Inner

scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer

Запишите файл parquet:

scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25

scala> outers.toDF.saveAsParquetFile("outers.parquet")

Прочитайте файл паркета:

scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row

scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]   

scala> val outers = dataFrame.map { row =>
     |   val key = row.getString(0)
     |   val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
     |   Outer(key, inners)
     | }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848

Важной частью является row.getAs[Seq[Row]](1). Внутреннее представление вложенной последовательности struct является ArrayBuffer[Row], вы можете использовать любой супер-тип это вместо Seq[Row]. 1 - это индекс столбца во внешней строке. Я использовал метод getAs здесь, но есть альтернативы в последних версиях Spark. Смотрите исходный код признака строки .

Теперь, когда у вас есть RDD[Outer], вы можете применить любое желаемое преобразование или действие.
// Filter the outers
outers.filter(_.inners.nonEmpty)

// Filter the inners
outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))
Обратите внимание, что мы использовали библиотеку spark-SQL только для чтения файла parquet. Например, можно выбрать только нужные столбцы непосредственно в фрейме данных, прежде чем сопоставлять его с РДУ.
dataFrame.select('col1, 'col2).map { row => ... }

Я дам ответ на основе Python, так как это то, что я использую. Я думаю, что в скале есть нечто подобное.

Функция explode была добавлена в Spark 1.4.0 для обработки вложенных массивов в фреймах данных, в соответствии сPython API docs .

Создайте тестовый фрейм данных:

from pyspark.sql import Row

df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])])
df.show()

## +-+--------------------+
## |a|             intlist|
## +-+--------------------+
## |1|ArrayBuffer(1, 2, 3)|
## |2|ArrayBuffer(4, 5, 6)|
## +-+--------------------+

Используйте explode для выравнивания столбца списка:

from pyspark.sql.functions import explode

df.select(df.a, explode(df.intlist)).show()

## +-+---+
## |a|_c0|
## +-+---+
## |1|  1|
## |1|  2|
## |1|  3|
## |2|  4|
## |2|  5|
## |2|  6|
## +-+---+

Другой подход будет использовать сопоставление шаблонов следующим образом:

val rdd: RDD[(String, List[(String, String)]] = dataFrame.map(_.toSeq.toList match { 
  case List(key: String, inners: Seq[Row]) => key -> inners.map(_.toSeq.toList match {
    case List(a:String, b: String) => (a, b)
  }).toList
})

Вы можете сопоставить паттерн непосредственно на строке, но это, скорее всего, не удастся по нескольким причинам.

Все приведенные выше ответы являются отличными ответами и решают этот вопрос с разных сторон; Spark SQL также является весьма полезным способом доступа к вложенным данным.

Вот пример использования функции explode () в SQL непосредственно для запроса вложенной коллекции.

SELECT hholdid, tsp.person_seq_no 
FROM (  SELECT hholdid, explode(tsp_ids) as tsp 
        FROM disc_mrt.unified_fact uf
     )

Tsp_ids-это вложенная структура, которая имеет множество атрибутов, включая person_seq_no, который я выбираю во внешнем запросе выше.

Выше был протестирован в Spark 2.0. Я сделал небольшой тест, и он не работает в Spark 1.6. Этот вопрос был спросили, когда Spark 2 не было рядом, так что этот ответ прекрасно дополняет список доступных вариантов для работы с вложенными структурами.

Заметные не разрешенные JIRAs на explode () для доступа SQL: