Как сопоставить имена столбцов фрейма данных с атрибутами класса case Scala?


Имена столбцов в этом примере из spark-sql происходят из case class Person.

case class Person(name: String, age: Int)

val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")

Https://spark.apache.org/docs/1.1.0/sql-programming-guide.html

Однако во многих случаях имена параметров могут быть изменены. Это приведет к тому, что столбцы не будут найдены, если файл не был обновлен для отражения изменений.

Как я могу указать соответствующее сопоставление?

Я думаю примерно так:

  val schema = StructType(Seq(
    StructField("name", StringType, nullable = false),
    StructField("age", IntegerType, nullable = false)
  ))


  val ps: Seq[Person] = ???

  val personRDD = sc.parallelize(ps)

  // Apply the schema to the RDD.
  val personDF: DataFrame = sqlContext.createDataFrame(personRDD, schema)
1 10

1 ответ:

В принципе, все отображение, которое вам нужно сделать, может быть достигнуто с помощью DataFrame.select(...). (Здесь я предполагаю, что никакие преобразования типов не нужны.) Учитывая прямое и обратное отображение как карты, существенной частью является

val mapping = from.map{ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray
// personsDF your original dataframe  
val mappedDF = personsDF.select( mapping: _* )

Где отображение-это массив Column s с псевдонимом.

Пример кода

object Example {   

  import org.apache.spark.rdd.RDD
  import org.apache.spark.{SparkContext, SparkConf}

  case class Person(name: String, age: Int)

  object Mapping {
    val from = Map("name" -> "a", "age" -> "b")
    val to = Map("a" -> "name", "b" -> "age")
  }

  def main(args: Array[String]) : Unit = {
    // init
    val conf = new SparkConf()
      .setAppName( "Example." )
      .setMaster( "local[*]")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    // create persons
    val persons = Seq(Person("bob", 35), Person("alice", 27))
    val personsRDD = sc.parallelize(persons, 4)
    val personsDF = personsRDD.toDF

    writeParquet( personsDF, "persons.parquet", sc, sqlContext)

    val otherPersonDF = readParquet( "persons.parquet", sc, sqlContext )
  }

  def writeParquet(personsDF: DataFrame, path:String, sc: SparkContext, sqlContext: SQLContext) : Unit = {
    import Mapping.from

    val mapping = from.map{ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray

    val mappedDF = personsDF.select( mapping: _* )
    mappedDF.write.parquet("/output/path.parquet") // parquet with columns "a" and "b"
  }

  def readParquet(path: String, sc: SparkContext, sqlContext: SQLContext) : Unit = {
    import Mapping.to
    val df = sqlContext.read.parquet(path) // this df has columns a and b

    val mapping = to.map{ (x:(String, String)) => df(x._1).as(x._2) }.toArray
    df.select( mapping: _* )
  }
}

Замечание

Если вам нужно преобразовать фрейм данных обратно в RDD [Person], то

val rdd : RDD[Row] = personsDF.rdd
val personsRDD : Rdd[Person] = rdd.map { r: Row => 
  Person( r.getAs("person"), r.getAs("age") )
}

Альтернативы

Также посмотрите на , Как преобразовать spark SchemaRDD в RDD моего класса case?