Как хранить пользовательские объекты в наборе данных?


по данным Представляем Наборы Данных Spark:

поскольку мы с нетерпением ждем Spark 2.0, мы планируем некоторые интересные улучшения в наборах данных, в частности: ... Пользовательские кодеры - в то время как в настоящее время мы автоматически генерируем кодеры для самых разных типов, мы хотели бы открыть API для пользовательских объектов.

и пытается сохранить пользовательский тип в Dataset привести к следующей ошибке, как:

Не удается найти кодер для тип, сохраненный в наборе данных. Примитивные типы (Int, String и т. д.) и типы продуктов (классы case) поддерживаются путем импорта sqlContext.неявный._ Поддержка сериализации других типов будет добавлена в будущих версиях

или:

Java.ленг.UnsupportedOperationException: кодировщик не найден ....

существуют ли какие-либо обходные пути?


обратите внимание, что этот вопрос существует только как точка входа для сообщества Wiki ответ. Не стесняйтесь обновлять / улучшать как вопрос, так и ответ.

7 109

7 ответов:

обновление

этот ответ по-прежнему действителен и информативен, хотя теперь все лучше с 2.2/2.3, что добавляет встроенную поддержку кодировщика для Set,Seq,Map,Date,Timestamp и BigDecimal. Если вы придерживаетесь создания типов только с классами case и обычными типами Scala, вы должны быть в порядке только с неявным in SQLImplicits.


к сожалению, практически ничего не было добавлено, чтобы помочь с этим. В поисках @since 2.0.0 in Encoders.scala или SQLImplicits.scala находит вещи, в основном связанные с примитивными типами (и некоторые настройки классов case). Итак, первое, что нужно сказать: в настоящее время нет реальной хорошей поддержки пользовательских кодеров классов. После этого следует несколько трюков, которые делают настолько хорошую работу, насколько мы можем надеяться, учитывая то, что мы в настоящее время имеем в нашем распоряжении. Как отказ от ответственности: это не будет работать идеально и я постараюсь сделать все ограничения ясно и заранее.

в чем именно проблема

когда вы хотите создать набор данных, Spark " требует кодировщика (для преобразования объекта JVM типа T В и из внутреннего представления Spark SQL), который обычно создается автоматически через implicits из A SparkSession, или может быть создан явно путем вызова статических методов на Encoders" (С документы на createDataset). Кодировщик примет вид Encoder[T] здесь T тип вы кодируете. Первое предложение-добавить import spark.implicits._ (который дает вам эти неявные кодеры) и второе предложение-явно передать неявный кодер с помощью этой набор функций, связанных с энкодером.

для обычных классов нет кодера, поэтому

import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

даст вам следующую неявную связанную ошибку времени компиляции:

не удается найти кодер для типа, хранящегося в a Набор данных. Примитивные типы (Int, String и т. д.) и типы продуктов (классы case) поддерживаются путем импорта sqlContext.неявный._ Поддержка сериализации других типов будет добавлена в будущих версиях

однако, если вы обернете любой тип, который вы только что использовали, чтобы получить вышеуказанную ошибку в каком-то классе, который расширяет Product, ошибка запутанно задерживается во время выполнения, так что

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

компилируется просто отлично, но терпит неудачу во время выполнения с

java.ленг.UnsupportedOperationException: кодировщик для MyObj не найден

причина этого заключается в том, что кодеры Искра создается с неявные преобразования, на самом деле сделаны только во время выполнения (через скала relfection). В этом случае все проверки Spark во время компиляции заключается в том, что самый внешний класс расширяет Product (что делают все классы case), и только во время выполнения понимает, что он все еще не знает, что делать с MyObj (та же проблема возникает, если я пытался сделать Dataset[(Int,MyObj)] - Искра ждет, пока время выполнения блевать на MyObj). Это центральные проблемы, которые остро нуждаются в исправлении:

  • некоторые классы, которые расширяют Product компиляции, несмотря на всегда сбой во время выполнения, и
  • нет способа передачи пользовательских кодеров для вложенных типов (у меня нет способа подачи Spark кодера только для MyObj такой, что он потом умеет кодировать Wrap[MyObj] или (Int,MyObj)).

просто использовать kryo

решение, которое все предлагают, заключается в использовании kryo шифратора.

import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

это становится довольно утомительным быстро. Особенно если ваш код манипулирует всевозможными наборами данных, объединением, группировкой и т. д. В итоге вы набираете кучу дополнительных неявные преобразования. Итак, почему бы просто не сделать неявное, что делает все это автоматически?

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

и теперь, кажется, я могу сделать почти все, что я хочу (пример ниже не работает в spark-shell здесь spark.implicits._ автоматически импортируется)

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

или почти. Проблема в том, что с помощью kryo приводит к тому, что Spark просто сохраняет каждую строку в наборе данных как плоский двоичный объект. Ибо map,filter,foreach этого вполне достаточно, но для таких операций, как join, Spark действительно нуждается в том, чтобы они были разделены на столбцы. Проверка схемы на d2 или d3, вы видите, что есть только один двоичный столбец:

d2.printSchema
// root
//  |-- value: binary (nullable = true)

частичное решение кортежи

так что, используя магию неявные преобразования в Scala (более 6.26.3 Разрешения Перегрузки), я могу сделать себе серию неявные преобразования, которые будут делать так же хорошо, как это возможно, по крайней мере для кортежей, и будет хорошо работать с существующими неявные преобразования:

import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)

implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)

// ... you can keep making these

затем, вооружившись этими имплицитами, я могу сделать свой пример выше, хотя и с некоторым переименованием столбца

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

я еще не понял, как получить ожидаемые имена кортежей (_1,_2, ...) по умолчанию без переименования их - если кто-то хочет поиграть с этим, этой название "value" знакомится и этой где обычно добавляются имена кортежей. Однако ключевым моментом является то, что у меня теперь есть хорошая структурированная схема:

d4.printSchema
// root
//  |-- _1: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
//  |-- _2: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)

Итак, вкратце, этот обходной путь:

  • позволяет нам получить отдельные столбцы для кортежей (так что мы можем присоединиться к кортежам снова, ура!)
  • мы снова можем просто полагаться на неявные преобразования (так что не нужно быть проездом в kryo все на месте)
  • почти полностью обратно совместим с import spark.implicits._ (С некоторым переименованием)
  • тут не давайте присоединимся к kyro сериализованные двоичные столбцы, не говоря уже о полях, которые могут иметь
  • имеет неприятный побочный эффект переименования некоторых столбцов кортежа в "значение" (при необходимости это может быть отменено путем преобразования .toDF, указание новых имен столбцов и преобразование обратно в набор данных - и имена схем, похоже, сохраняются через соединения, где они наиболее необходимы).

частичное решение для классов в целом

Этот менее приятный и не имеет хорошего решения. Однако теперь, когда у нас есть решение кортежа выше, у меня есть догадка, что неявное решение преобразования из другого ответа будет немного менее болезненным, так как вы можете конвертировать более сложные классы для кортежей. Затем, после создания набора данных, вы, вероятно, переименуете столбцы, используя подход dataframe. Если все пойдет хорошо, это действительно улучшение, так как теперь я могу выполнять соединения на полях моих классов. Если бы я только использовал один плоский двоичный kryo сериализатор это было бы невозможно.

вот пример, который делает всего понемногу: у меня есть класс MyObj, который имеет поля типа Int,java.util.UUID, и Set[String]. Первый заботится о себе. Второй, хотя я мог бы сериализовать с помощью kryo было бы более полезно, если сохраняется как String (поскольку UUIDs, как правило, то, что я хочу присоединиться против). Третий действительно просто принадлежит в двоичном столбце.

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])

// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])

// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

теперь я могу создать набор данных с хорошей схемой, используя этот механизм:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]

и схема показывает мне, что я столбцы с правильными именами и с первыми двумя обеими вещами я могу присоединиться против.

d.printSchema
// root
//  |-- i: integer (nullable = false)
//  |-- u: string (nullable = true)
//  |-- s: binary (nullable = true)
  1. используя универсальные датчики.

    на данный момент доступны два универсальных кодера kryo и javaSerialization где последний явно описывается как:

    крайне неэффективно и должно использоваться только в крайнем случае.

    предполагая следующий класс

    class Bar(i: Int) {
      override def toString = s"bar $i"
      def bar = i
    }
    

    вы можете использовать эти кодеры, добавив неявное кодировщик:

    object BarEncoders {
      implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
      org.apache.spark.sql.Encoders.kryo[Bar]
    }
    

    которые могут быть использованы вместе следующим образом:

    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarEncoders._
    
        val ds = Seq(new Bar(1)).toDS
        ds.show
    
        sc.stop()
      }
    }
    

    он хранит предметы как binary столбец так при преобразовании в DataFrame вы получаете следующую схему:

    root
     |-- value: binary (nullable = true)
    

    также можно кодировать кортежи с помощью kryo кодер для конкретного поля:

    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
    

    обратите внимание, что мы не зависим от неявных кодеров здесь, но передаем кодер явно, поэтому это, скорее всего, не будет работать с toDS метод.

  2. использование неявных преобразований:

    обеспечить неявные преобразования между представлением, которое может быть закодировано и пользовательского класса, например:

    object BarConversions {
      implicit def toInt(bar: Bar): Int = bar.bar
      implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarConversions._
    
        type EncodedBar = Int
    
        val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
        val barsDS = bars.toDS
    
        barsDS.show
        barsDS.map(_.bar).show
    
        sc.stop()
      }
    }
    

вопросы:

кодеры работают более или менее одинаково в Spark2.0. И Kryo по-прежнему рекомендуется использовать serialization выбор.

вы можете посмотреть на следующий пример с spark-shell

scala> import spark.implicits._
import spark.implicits._

scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders

scala> case class NormalPerson(name: String, age: Int) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class NormalPerson

scala> case class ReversePerson(name: Int, age: String) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class ReversePerson

scala> val normalPersons = Seq(
 |   NormalPerson("Superman", 25),
 |   NormalPerson("Spiderman", 17),
 |   NormalPerson("Ironman", 29)
 | )
normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29))

scala> val ds1 = sc.parallelize(normalPersons).toDS
ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int]

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds1.show()
+---------+---+
|     name|age|
+---------+---+
| Superman| 25|
|Spiderman| 17|
|  Ironman| 29|
+---------+---+

scala> ds2.show()
+----+---------+
|name|      age|
+----+---------+
|  25| Superman|
|  17|Spiderman|
|  29|  Ironman|
+----+---------+

scala> ds1.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Superman. I am 25 years old.
I am Spiderman. I am 17 years old.

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds2.foreach(p => println(p.aboutMe))
I am 17. I am Spiderman years old.
I am 25. I am Superman years old.
I am 29. I am Ironman years old.

до сих пор] не было appropriate encoders в нынешнем объеме, так что наши лица не были закодированы как binary значения. Но это изменится, как только мы предоставим некоторые implicit кодеры с помощью Kryo сериализация.

// Provide Encoders

scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson]
normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary]

scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson]
reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary]

// Ecoders will be used since they are now present in Scope

scala> val ds3 = sc.parallelize(normalPersons).toDS
ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary]

scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name))
ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary]

// now all our persons show up as binary values
scala> ds3.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

scala> ds4.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

// Our instances still work as expected    

scala> ds3.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Spiderman. I am 17 years old.
I am Superman. I am 25 years old.

scala> ds4.foreach(p => println(p.aboutMe))
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
I am 17. I am Spiderman years old.

в случае Java Bean класса, это может быть полезно

import spark.sqlContext.implicits._
import org.apache.spark.sql.Encoders
implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])

теперь вы можете просто прочитать фрейм данных как пользовательский фрейм данных

dataFrame.as[MyClass]

это создаст пользовательский кодер класса, а не двоичный.

вы можете использовать UDTRegistration, а затем Case-классы, кортежи и т. д... все работают правильно с вашим определенным пользователем типом!

скажем, вы хотите использовать пользовательское перечисление:

trait CustomEnum { def value:String }
case object Foo extends CustomEnum  { val value = "F" }
case object Bar extends CustomEnum  { val value = "B" }
object CustomEnum {
  def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get
}

зарегистрируйте его следующим образом:

// First define a UDT class for it:
class CustomEnumUDT extends UserDefinedType[CustomEnum] {
  override def sqlType: DataType = org.apache.spark.sql.types.StringType
  override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
  // Note that this will be a UTF8String type
  override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
  override def userClass: Class[CustomEnum] = classOf[CustomEnum]
}

// Then Register the UDT Class!
// NOTE: you have to put this file into the org.apache.spark package!
UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)

тогда используйте его!

case class UsingCustomEnum(id:Int, en:CustomEnum)

val seq = Seq(
  UsingCustomEnum(1, Foo),
  UsingCustomEnum(2, Bar),
  UsingCustomEnum(3, Foo)
).toDS()
seq.filter(_.en == Foo).show()
println(seq.collect())

скажем, вы хотите использовать полиморфную запись:

trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly

... и использовать его так:

case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

вы можете написать пользовательский UDT, который кодирует все в байтах (я использую здесь сериализацию java, но, вероятно, лучше использовать контекст Kryo Spark).

сначала определите класс UDT:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
  val kryo = new Kryo()

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: CustomPoly): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)

    bos.toByteArray
  }
  override def deserialize(datum: Any): CustomPoly = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[CustomPoly]
  }

  override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}

затем зарегистрировать его:

// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)

затем вы можете использовать его!

// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

мои примеры будут на Java, но я не думаю, что это будет трудно адаптироваться к Scala.

Я был довольно успешным преобразования RDD<Fruit> до Dataset<Fruit> используя Искра.createDataset и энкодеры.в зернах пока Fruit простой Java Bean.

Шаг 1: создайте простой Java-компонент.

public class Fruit implements Serializable {
    private String name  = "default-fruit";
    private String color = "default-color";

    // AllArgsConstructor
    public Fruit(String name, String color) {
        this.name  = name;
        this.color = color;
    }

    // NoArgsConstructor
    public Fruit() {
        this("default-fruit", "default-color");
    }

    // ...create getters and setters for above fields
    // you figure it out
}

Я бы придерживался классов с примитивными типами и строками в качестве полей перед тем, как DataBricks люди усиливают свои кодеры. если у вас есть класс с вложенным объектом, создайте еще один простой Java-Боб со всеми его полями, сглаженными, поэтому вы можете использовать преобразования RDD для сопоставления сложного типа с более простым. конечно, это немного дополнительная работа, но я думаю, что это поможет много на производительность работы с плоской схемой.

Шаг 2: получите свой набор данных из RDD

SparkSession spark = SparkSession.builder().getOrCreate();
JavaSparkContext jsc = new JavaSparkContext();

List<Fruit> fruitList = ImmutableList.of(
    new Fruit("apple", "red"),
    new Fruit("orange", "orange"),
    new Fruit("grape", "purple"));
JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList);


RDD<Fruit> fruitRDD = fruitJavaRDD.rdd();
Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class);
Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean);

и вуаля! Вспенить, смыть, повторять.

для тех, кто может в моей ситуации я положил мой ответ здесь тоже.

в частности,

  1. Я читал "набор типизированных данных" из SQLContext. Так оригинальный формат таблицы данных.

    val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()

    +---+---+ | a| b| +---+---+ | 1|[1]| +---+---+

  2. затем преобразуйте его в RDD с помощью rdd.карта() с изменяемым.WrappedArray типа.

    sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)

    результат:

    (1,Set(1))