Как хранить пользовательские объекты в наборе данных?
по данным Представляем Наборы Данных Spark:
поскольку мы с нетерпением ждем Spark 2.0, мы планируем некоторые интересные улучшения в наборах данных, в частности: ... Пользовательские кодеры - в то время как в настоящее время мы автоматически генерируем кодеры для самых разных типов, мы хотели бы открыть API для пользовательских объектов.
и пытается сохранить пользовательский тип в Dataset
привести к следующей ошибке, как:
Не удается найти кодер для тип, сохраненный в наборе данных. Примитивные типы (Int, String и т. д.) и типы продуктов (классы case) поддерживаются путем импорта sqlContext.неявный._ Поддержка сериализации других типов будет добавлена в будущих версиях
или:
Java.ленг.UnsupportedOperationException: кодировщик не найден ....
существуют ли какие-либо обходные пути?
обратите внимание, что этот вопрос существует только как точка входа для сообщества Wiki ответ. Не стесняйтесь обновлять / улучшать как вопрос, так и ответ.
7 ответов:
обновление
этот ответ по-прежнему действителен и информативен, хотя теперь все лучше с 2.2/2.3, что добавляет встроенную поддержку кодировщика для
Set
,Seq
,Map
,Date
,Timestamp
иBigDecimal
. Если вы придерживаетесь создания типов только с классами case и обычными типами Scala, вы должны быть в порядке только с неявным inSQLImplicits
.
к сожалению, практически ничего не было добавлено, чтобы помочь с этим. В поисках
@since 2.0.0
inEncoders.scala
илиSQLImplicits.scala
находит вещи, в основном связанные с примитивными типами (и некоторые настройки классов case). Итак, первое, что нужно сказать: в настоящее время нет реальной хорошей поддержки пользовательских кодеров классов. После этого следует несколько трюков, которые делают настолько хорошую работу, насколько мы можем надеяться, учитывая то, что мы в настоящее время имеем в нашем распоряжении. Как отказ от ответственности: это не будет работать идеально и я постараюсь сделать все ограничения ясно и заранее.в чем именно проблема
когда вы хотите создать набор данных, Spark " требует кодировщика (для преобразования объекта JVM типа T В и из внутреннего представления Spark SQL), который обычно создается автоматически через implicits из A
SparkSession
, или может быть создан явно путем вызова статических методов наEncoders
" (С документы наcreateDataset
). Кодировщик примет видEncoder[T]
здесьT
тип вы кодируете. Первое предложение-добавитьimport spark.implicits._
(который дает вам эти неявные кодеры) и второе предложение-явно передать неявный кодер с помощью этой набор функций, связанных с энкодером.для обычных классов нет кодера, поэтому
import spark.implicits._ class MyObj(val i: Int) // ... val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
даст вам следующую неявную связанную ошибку времени компиляции:
не удается найти кодер для типа, хранящегося в a Набор данных. Примитивные типы (Int, String и т. д.) и типы продуктов (классы case) поддерживаются путем импорта sqlContext.неявный._ Поддержка сериализации других типов будет добавлена в будущих версиях
однако, если вы обернете любой тип, который вы только что использовали, чтобы получить вышеуказанную ошибку в каком-то классе, который расширяет
Product
, ошибка запутанно задерживается во время выполнения, так чтоimport spark.implicits._ case class Wrap[T](unwrap: T) class MyObj(val i: Int) // ... val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
компилируется просто отлично, но терпит неудачу во время выполнения с
java.ленг.UnsupportedOperationException: кодировщик для MyObj не найден
причина этого заключается в том, что кодеры Искра создается с неявные преобразования, на самом деле сделаны только во время выполнения (через скала relfection). В этом случае все проверки Spark во время компиляции заключается в том, что самый внешний класс расширяет
Product
(что делают все классы case), и только во время выполнения понимает, что он все еще не знает, что делать сMyObj
(та же проблема возникает, если я пытался сделатьDataset[(Int,MyObj)]
- Искра ждет, пока время выполнения блевать наMyObj
). Это центральные проблемы, которые остро нуждаются в исправлении:
- некоторые классы, которые расширяют
Product
компиляции, несмотря на всегда сбой во время выполнения, и- нет способа передачи пользовательских кодеров для вложенных типов (у меня нет способа подачи Spark кодера только для
MyObj
такой, что он потом умеет кодироватьWrap[MyObj]
или(Int,MyObj)
).просто использовать
kryo
решение, которое все предлагают, заключается в использовании
kryo
шифратора.import spark.implicits._ class MyObj(val i: Int) implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj] // ... val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
это становится довольно утомительным быстро. Особенно если ваш код манипулирует всевозможными наборами данных, объединением, группировкой и т. д. В итоге вы набираете кучу дополнительных неявные преобразования. Итак, почему бы просто не сделать неявное, что делает все это автоматически?
import scala.reflect.ClassTag implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = org.apache.spark.sql.Encoders.kryo[A](ct)
и теперь, кажется, я могу сделать почти все, что я хочу (пример ниже не работает в
spark-shell
здесьspark.implicits._
автоматически импортируется)class MyObj(val i: Int) val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and .. val d3 = d1.map(d => (d.i, d)).alias("d3") // .. deals with the new type val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
или почти. Проблема в том, что с помощью
kryo
приводит к тому, что Spark просто сохраняет каждую строку в наборе данных как плоский двоичный объект. Ибоmap
,filter
,foreach
этого вполне достаточно, но для таких операций, какjoin
, Spark действительно нуждается в том, чтобы они были разделены на столбцы. Проверка схемы наd2
илиd3
, вы видите, что есть только один двоичный столбец:d2.printSchema // root // |-- value: binary (nullable = true)
частичное решение кортежи
так что, используя магию неявные преобразования в Scala (более 6.26.3 Разрешения Перегрузки), я могу сделать себе серию неявные преобразования, которые будут делать так же хорошо, как это возможно, по крайней мере для кортежей, и будет хорошо работать с существующими неявные преобразования:
import org.apache.spark.sql.{Encoder,Encoders} import scala.reflect.ClassTag import spark.implicits._ // we can still take advantage of all the old implicits implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c) implicit def tuple2[A1, A2]( implicit e1: Encoder[A1], e2: Encoder[A2] ): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2) implicit def tuple3[A1, A2, A3]( implicit e1: Encoder[A1], e2: Encoder[A2], e3: Encoder[A3] ): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3) // ... you can keep making these
затем, вооружившись этими имплицитами, я могу сделать свой пример выше, хотя и с некоторым переименованием столбца
class MyObj(val i: Int) val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2") val d3 = d1.map(d => (d.i ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3") val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
я еще не понял, как получить ожидаемые имена кортежей (
_1
,_2
, ...) по умолчанию без переименования их - если кто-то хочет поиграть с этим, этой название"value"
знакомится и этой где обычно добавляются имена кортежей. Однако ключевым моментом является то, что у меня теперь есть хорошая структурированная схема:d4.printSchema // root // |-- _1: struct (nullable = false) // | |-- _1: integer (nullable = true) // | |-- _2: binary (nullable = true) // |-- _2: struct (nullable = false) // | |-- _1: integer (nullable = true) // | |-- _2: binary (nullable = true)
Итак, вкратце, этот обходной путь:
- позволяет нам получить отдельные столбцы для кортежей (так что мы можем присоединиться к кортежам снова, ура!)
- мы снова можем просто полагаться на неявные преобразования (так что не нужно быть проездом в
kryo
все на месте)- почти полностью обратно совместим с
import spark.implicits._
(С некоторым переименованием)- тут не давайте присоединимся к
kyro
сериализованные двоичные столбцы, не говоря уже о полях, которые могут иметь- имеет неприятный побочный эффект переименования некоторых столбцов кортежа в "значение" (при необходимости это может быть отменено путем преобразования
.toDF
, указание новых имен столбцов и преобразование обратно в набор данных - и имена схем, похоже, сохраняются через соединения, где они наиболее необходимы).частичное решение для классов в целом
Этот менее приятный и не имеет хорошего решения. Однако теперь, когда у нас есть решение кортежа выше, у меня есть догадка, что неявное решение преобразования из другого ответа будет немного менее болезненным, так как вы можете конвертировать более сложные классы для кортежей. Затем, после создания набора данных, вы, вероятно, переименуете столбцы, используя подход dataframe. Если все пойдет хорошо, это действительно улучшение, так как теперь я могу выполнять соединения на полях моих классов. Если бы я только использовал один плоский двоичный
kryo
сериализатор это было бы невозможно.вот пример, который делает всего понемногу: у меня есть класс
MyObj
, который имеет поля типаInt
,java.util.UUID
, иSet[String]
. Первый заботится о себе. Второй, хотя я мог бы сериализовать с помощьюkryo
было бы более полезно, если сохраняется какString
(посколькуUUID
s, как правило, то, что я хочу присоединиться против). Третий действительно просто принадлежит в двоичном столбце.class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String]) // alias for the type to convert to and from type MyObjEncoded = (Int, String, Set[String]) // implicit conversions implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s) implicit def fromEncoded(e: MyObjEncoded): MyObj = new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
теперь я могу создать набор данных с хорошей схемой, используя этот механизм:
val d = spark.createDataset(Seq[MyObjEncoded]( new MyObj(1, java.util.UUID.randomUUID, Set("foo")), new MyObj(2, java.util.UUID.randomUUID, Set("bar")) )).toDF("i","u","s").as[MyObjEncoded]
и схема показывает мне, что я столбцы с правильными именами и с первыми двумя обеими вещами я могу присоединиться против.
d.printSchema // root // |-- i: integer (nullable = false) // |-- u: string (nullable = true) // |-- s: binary (nullable = true)
используя универсальные датчики.
на данный момент доступны два универсальных кодера
kryo
иjavaSerialization
где последний явно описывается как:крайне неэффективно и должно использоваться только в крайнем случае.
предполагая следующий класс
class Bar(i: Int) { override def toString = s"bar $i" def bar = i }
вы можете использовать эти кодеры, добавив неявное кодировщик:
object BarEncoders { implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = org.apache.spark.sql.Encoders.kryo[Bar] }
которые могут быть использованы вместе следующим образом:
object Main { def main(args: Array[String]) { val sc = new SparkContext("local", "test", new SparkConf()) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import BarEncoders._ val ds = Seq(new Bar(1)).toDS ds.show sc.stop() } }
он хранит предметы как
binary
столбец так при преобразовании вDataFrame
вы получаете следующую схему:root |-- value: binary (nullable = true)
также можно кодировать кортежи с помощью
kryo
кодер для конкретного поля:val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar]) spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder) // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
обратите внимание, что мы не зависим от неявных кодеров здесь, но передаем кодер явно, поэтому это, скорее всего, не будет работать с
toDS
метод.использование неявных преобразований:
обеспечить неявные преобразования между представлением, которое может быть закодировано и пользовательского класса, например:
object BarConversions { implicit def toInt(bar: Bar): Int = bar.bar implicit def toBar(i: Int): Bar = new Bar(i) } object Main { def main(args: Array[String]) { val sc = new SparkContext("local", "test", new SparkConf()) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import BarConversions._ type EncodedBar = Int val bars: RDD[EncodedBar] = sc.parallelize(Seq(new Bar(1))) val barsDS = bars.toDS barsDS.show barsDS.map(_.bar).show sc.stop() } }
вопросы:
кодеры работают более или менее одинаково в
Spark2.0
. ИKryo
по-прежнему рекомендуется использоватьserialization
выбор.вы можете посмотреть на следующий пример с spark-shell
scala> import spark.implicits._ import spark.implicits._ scala> import org.apache.spark.sql.Encoders import org.apache.spark.sql.Encoders scala> case class NormalPerson(name: String, age: Int) { | def aboutMe = s"I am ${name}. I am ${age} years old." | } defined class NormalPerson scala> case class ReversePerson(name: Int, age: String) { | def aboutMe = s"I am ${name}. I am ${age} years old." | } defined class ReversePerson scala> val normalPersons = Seq( | NormalPerson("Superman", 25), | NormalPerson("Spiderman", 17), | NormalPerson("Ironman", 29) | ) normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29)) scala> val ds1 = sc.parallelize(normalPersons).toDS ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int] scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name)) ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string] scala> ds1.show() +---------+---+ | name|age| +---------+---+ | Superman| 25| |Spiderman| 17| | Ironman| 29| +---------+---+ scala> ds2.show() +----+---------+ |name| age| +----+---------+ | 25| Superman| | 17|Spiderman| | 29| Ironman| +----+---------+ scala> ds1.foreach(p => println(p.aboutMe)) I am Ironman. I am 29 years old. I am Superman. I am 25 years old. I am Spiderman. I am 17 years old. scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name)) ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string] scala> ds2.foreach(p => println(p.aboutMe)) I am 17. I am Spiderman years old. I am 25. I am Superman years old. I am 29. I am Ironman years old.
до сих пор] не было
appropriate encoders
в нынешнем объеме, так что наши лица не были закодированы какbinary
значения. Но это изменится, как только мы предоставим некоторыеimplicit
кодеры с помощьюKryo
сериализация.// Provide Encoders scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson] normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary] scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson] reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary] // Ecoders will be used since they are now present in Scope scala> val ds3 = sc.parallelize(normalPersons).toDS ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary] scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name)) ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary] // now all our persons show up as binary values scala> ds3.show() +--------------------+ | value| +--------------------+ |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| +--------------------+ scala> ds4.show() +--------------------+ | value| +--------------------+ |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| +--------------------+ // Our instances still work as expected scala> ds3.foreach(p => println(p.aboutMe)) I am Ironman. I am 29 years old. I am Spiderman. I am 17 years old. I am Superman. I am 25 years old. scala> ds4.foreach(p => println(p.aboutMe)) I am 25. I am Superman years old. I am 29. I am Ironman years old. I am 17. I am Spiderman years old.
в случае Java Bean класса, это может быть полезно
import spark.sqlContext.implicits._ import org.apache.spark.sql.Encoders implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])
теперь вы можете просто прочитать фрейм данных как пользовательский фрейм данных
dataFrame.as[MyClass]
это создаст пользовательский кодер класса, а не двоичный.
вы можете использовать UDTRegistration, а затем Case-классы, кортежи и т. д... все работают правильно с вашим определенным пользователем типом!
скажем, вы хотите использовать пользовательское перечисление:
trait CustomEnum { def value:String } case object Foo extends CustomEnum { val value = "F" } case object Bar extends CustomEnum { val value = "B" } object CustomEnum { def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get }
зарегистрируйте его следующим образом:
// First define a UDT class for it: class CustomEnumUDT extends UserDefinedType[CustomEnum] { override def sqlType: DataType = org.apache.spark.sql.types.StringType override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value) // Note that this will be a UTF8String type override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString) override def userClass: Class[CustomEnum] = classOf[CustomEnum] } // Then Register the UDT Class! // NOTE: you have to put this file into the org.apache.spark package! UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)
тогда используйте его!
case class UsingCustomEnum(id:Int, en:CustomEnum) val seq = Seq( UsingCustomEnum(1, Foo), UsingCustomEnum(2, Bar), UsingCustomEnum(3, Foo) ).toDS() seq.filter(_.en == Foo).show() println(seq.collect())
скажем, вы хотите использовать полиморфную запись:
trait CustomPoly case class FooPoly(id:Int) extends CustomPoly case class BarPoly(value:String, secondValue:Long) extends CustomPoly
... и использовать его так:
case class UsingPoly(id:Int, poly:CustomPoly) Seq( UsingPoly(1, new FooPoly(1)), UsingPoly(2, new BarPoly("Blah", 123)), UsingPoly(3, new FooPoly(1)) ).toDS polySeq.filter(_.poly match { case FooPoly(value) => value == 1 case _ => false }).show()
вы можете написать пользовательский UDT, который кодирует все в байтах (я использую здесь сериализацию java, но, вероятно, лучше использовать контекст Kryo Spark).
сначала определите класс UDT:
class CustomPolyUDT extends UserDefinedType[CustomPoly] { val kryo = new Kryo() override def sqlType: DataType = org.apache.spark.sql.types.BinaryType override def serialize(obj: CustomPoly): Any = { val bos = new ByteArrayOutputStream() val oos = new ObjectOutputStream(bos) oos.writeObject(obj) bos.toByteArray } override def deserialize(datum: Any): CustomPoly = { val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]]) val ois = new ObjectInputStream(bis) val obj = ois.readObject() obj.asInstanceOf[CustomPoly] } override def userClass: Class[CustomPoly] = classOf[CustomPoly] }
затем зарегистрировать его:
// NOTE: The file you do this in has to be inside of the org.apache.spark package! UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)
затем вы можете использовать его!
// As shown above: case class UsingPoly(id:Int, poly:CustomPoly) Seq( UsingPoly(1, new FooPoly(1)), UsingPoly(2, new BarPoly("Blah", 123)), UsingPoly(3, new FooPoly(1)) ).toDS polySeq.filter(_.poly match { case FooPoly(value) => value == 1 case _ => false }).show()
мои примеры будут на Java, но я не думаю, что это будет трудно адаптироваться к Scala.
Я был довольно успешным преобразования
RDD<Fruit>
доDataset<Fruit>
используя Искра.createDataset и энкодеры.в зернах покаFruit
простой Java Bean.Шаг 1: создайте простой Java-компонент.
public class Fruit implements Serializable { private String name = "default-fruit"; private String color = "default-color"; // AllArgsConstructor public Fruit(String name, String color) { this.name = name; this.color = color; } // NoArgsConstructor public Fruit() { this("default-fruit", "default-color"); } // ...create getters and setters for above fields // you figure it out }
Я бы придерживался классов с примитивными типами и строками в качестве полей перед тем, как DataBricks люди усиливают свои кодеры. если у вас есть класс с вложенным объектом, создайте еще один простой Java-Боб со всеми его полями, сглаженными, поэтому вы можете использовать преобразования RDD для сопоставления сложного типа с более простым. конечно, это немного дополнительная работа, но я думаю, что это поможет много на производительность работы с плоской схемой.
Шаг 2: получите свой набор данных из RDD
SparkSession spark = SparkSession.builder().getOrCreate(); JavaSparkContext jsc = new JavaSparkContext(); List<Fruit> fruitList = ImmutableList.of( new Fruit("apple", "red"), new Fruit("orange", "orange"), new Fruit("grape", "purple")); JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList); RDD<Fruit> fruitRDD = fruitJavaRDD.rdd(); Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class); Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean);
и вуаля! Вспенить, смыть, повторять.
для тех, кто может в моей ситуации я положил мой ответ здесь тоже.
в частности,
Я читал "набор типизированных данных" из SQLContext. Так оригинальный формат таблицы данных.
val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()
+---+---+ | a| b| +---+---+ | 1|[1]| +---+---+
затем преобразуйте его в RDD с помощью rdd.карта() с изменяемым.WrappedArray типа.
sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)
результат:
(1,Set(1))