Как сопоставить имена столбцов фрейма данных с атрибутами класса case Scala?
Имена столбцов в этом примере из spark-sql происходят из case class Person
.
case class Person(name: String, age: Int)
val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")
Https://spark.apache.org/docs/1.1.0/sql-programming-guide.html
Однако во многих случаях имена параметров могут быть изменены. Это приведет к тому, что столбцы не будут найдены, если файл не был обновлен для отражения изменений.Как я могу указать соответствующее сопоставление?
Я думаю примерно так:
val schema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false)
))
val ps: Seq[Person] = ???
val personRDD = sc.parallelize(ps)
// Apply the schema to the RDD.
val personDF: DataFrame = sqlContext.createDataFrame(personRDD, schema)
1 ответ:
В принципе, все отображение, которое вам нужно сделать, может быть достигнуто с помощью
DataFrame.select(...)
. (Здесь я предполагаю, что никакие преобразования типов не нужны.) Учитывая прямое и обратное отображение как карты, существенной частью являетсяval mapping = from.map{ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray // personsDF your original dataframe val mappedDF = personsDF.select( mapping: _* )
Где отображение-это массив
Column
s с псевдонимом.Пример кода
object Example { import org.apache.spark.rdd.RDD import org.apache.spark.{SparkContext, SparkConf} case class Person(name: String, age: Int) object Mapping { val from = Map("name" -> "a", "age" -> "b") val to = Map("a" -> "name", "b" -> "age") } def main(args: Array[String]) : Unit = { // init val conf = new SparkConf() .setAppName( "Example." ) .setMaster( "local[*]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ // create persons val persons = Seq(Person("bob", 35), Person("alice", 27)) val personsRDD = sc.parallelize(persons, 4) val personsDF = personsRDD.toDF writeParquet( personsDF, "persons.parquet", sc, sqlContext) val otherPersonDF = readParquet( "persons.parquet", sc, sqlContext ) } def writeParquet(personsDF: DataFrame, path:String, sc: SparkContext, sqlContext: SQLContext) : Unit = { import Mapping.from val mapping = from.map{ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray val mappedDF = personsDF.select( mapping: _* ) mappedDF.write.parquet("/output/path.parquet") // parquet with columns "a" and "b" } def readParquet(path: String, sc: SparkContext, sqlContext: SQLContext) : Unit = { import Mapping.to val df = sqlContext.read.parquet(path) // this df has columns a and b val mapping = to.map{ (x:(String, String)) => df(x._1).as(x._2) }.toArray df.select( mapping: _* ) } }
Замечание
Если вам нужно преобразовать фрейм данных обратно в RDD [Person], то
val rdd : RDD[Row] = personsDF.rdd val personsRDD : Rdd[Person] = rdd.map { r: Row => Person( r.getAs("person"), r.getAs("age") ) }
Альтернативы
Также посмотрите на , Как преобразовать spark SchemaRDD в RDD моего класса case?