Задача не сериализуемая: java. io. NotSerializableException при вызове функции вне закрытия только на классы не объекты


получение странного поведения при вызове функции вне закрытия:

  • когда функция находится в объекте все работает
  • когда функция находится в классе вам :

задача не сериализуемая: java.io. NotSerializableException: testing

проблема в том, что мне нужен мой код в классе, а не объект. Есть идеи, почему это происходит? Сериализуется ли объект Scala (по умолчанию?)?

Это пример рабочего кода:

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}

это нерабочий пример:

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}
6 172

6 ответов:

Я не думаю, что другой ответ совершенно правильный. RDDs действительно сериализуемы, так что это не то, что вызывает вашу задачу не получится.

Искра распределенной вычислительной системы и ее основных абстракция представляет собой упругий распределенных наборов данных (RDD), которое можно рассматривать как совокупность. В основном, элементы RDD разделены по узлам кластера, но Spark абстрагирует это от пользователя, позволяя пользователю взаимодействуйте с RDD (коллекцией), как если бы она была локальной.

не вдаваться в слишком много деталей, но при запуске различных преобразований на RDD (map,flatMap,filter и другие), ваш код преобразования (закрытие):

  1. сериализовано на узле драйвера,
  2. отправлено на соответствующие узлы кластера,
  3. десериализации
  4. и, наконец, выполняется на узлах

вы можете конечно, запустите это локально (как в вашем примере), но все эти фазы (кроме доставки по сети) все еще происходят. [Это позволяет поймать любые ошибки еще до развертывания в производство]

что происходит во втором случае, так это то, что вы вызываете метод, определенный в классе testing внутри функции map. Spark видит, что и поскольку методы не могут быть сериализованы самостоятельно, Spark пытается сериализовать весьtesting класс, так что код будет работать при выполнении в другом JVM. У вас есть две возможности:

либо вы делаете тестирование класса сериализуемым, поэтому весь класс может быть сериализован Spark:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test extends java.io.Serializable {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  def someFunc(a: Int) = a + 1
}

или вы делаете someFunc функция вместо метода (функции-это объекты в Scala), так что Spark сможет сериализовать его:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  val someFunc = (a: Int) => a + 1
}

аналогичная, но не та же проблема с сериализацией классов может представлять интерес для вас, и вы можете прочитать об этом в этой Искре саммита 2013 презентация.

в качестве примечания, вы можете переписать rddList.map(someFunc(_)) до rddList.map(someFunc), они точно такие же. Как правило, второй предпочтительнее, так как он менее многословен и чище для чтения.

EDIT (2015-03-15):Искра-5307 ввел SerializationDebugger и Spark 1.3.0 является первой версией, чтобы использовать его. Он добавляет путь сериализации к NotSerializableException. Когда NotSerializableException встречается, то отладчик посещает граф объектов, чтобы найти путь к объекту, который не может быть сериализован, и создает информацию, чтобы помочь пользователю найти объект.

в случае OP это то, что печатается в stdout:

Serialization stack:
    - object not serializable (class: testing, value: testing@2dfe2f00)
    - field (class: testing$$anonfun, name: $outer, type: class testing)
    - object (class testing$$anonfun, <function1>)

Грега это в объяснении, почему исходный код не работает и два способа исправить эту проблему. Однако это решение не очень гибко; рассмотрим случай, когда ваше закрытие включает вызов метода на не-Serializable класс, который вы не можете контролировать. Вы не можете ни добавить Serializable тег к этому классу, ни изменить базовую реализацию, чтобы изменить метод в функцию.

Nilesh представляет собой отличное решение для это, но решение можно сделать как более кратким, так и общим:

def genMapper[A, B](f: A => B): A => B = {
  val locker = com.twitter.chill.MeatLocker(f)
  x => locker.get.apply(x)
}

эта функция-сериализатор может быть использован для автоматического обертывания закрытия и вызовов методов:

rdd map genMapper(someFunc)

этот метод также имеет то преимущество, что не требует дополнительных зависимостей Shark для доступа KryoSerializationWrapper, так как холод Twitter уже втянут основной искрой

полный разговор полностью объясняет проблему, которая предлагает отличный способ смены парадигмы, чтобы избежать этих проблем сериализации: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-leaks-no-ws.md

главный голосованный ответ в основном предлагает выбросить всю языковую функцию-то есть больше не использовать методы и использовать только функции. Действительно в функциональном программировании методы в классах должны быть избежать, но превращение их в функции не решает проблему дизайна здесь (см. ссылку выше).

в качестве быстрого решения в этой конкретной ситуации вы можете просто использовать @transient аннотация, чтобы сказать ему не пытаться сериализовать оскорбительное значение (здесь,Spark.ctx является пользовательским классом, а не одним из следующих имен OP):

@transient
val rddList = Spark.ctx.parallelize(list)

вы также можете реструктурировать код так, что rddList живет где-то еще, но это тоже неприятно.

будущее, вероятно Споры

в будущем Scala будет включать в себя эти вещи, называемые "спорами", которые должны позволить нам контролировать мелкое зерно, что делает и не совсем втягивается закрытием. Кроме того, это должно превратить все ошибки случайного вытягивания несериализуемых типов (или любых нежелательных значений) в ошибки компиляции, а не сейчас, что является ужасными исключениями времени выполнения / утечками памяти.

http://docs.scala-lang.org/sips/pending/spores.html

A совет по сериализации Kryo

при использовании kyro сделайте так, чтобы регистрация была необходима, это будет означать, что вы получаете ошибки вместо утечек памяти:

наконец, я знаю, что у крио есть крио.setRegistrationOptional (true) но у меня очень трудное время, пытаясь выяснить, как его использовать. Когда эта опция включена, kryo все еще, кажется, бросает исключения, если я не зарегистрировал классы."

стратегия регистрации классов с помощью kryo

конечно, это дает вам только контроль уровня типа, а не контроль уровня значения.

... еще больше идей впереди.

Я решил эту проблему, используя другой подход. Вам просто нужно сериализовать объекты перед прохождением через закрытие,а затем де-сериализовать. Этот подход просто работает, даже если ваши классы не Сериализуемы, потому что он использует Kryo за кулисами. Все, что вам нужно, это немного карри. ;)

вот пример того, как я это сделал:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}

Не стесняйтесь делать мля так сложно, как вы хотите, класс, объект, вложенные классы, ссылки на несколько 3-й партии библиотеки.

KryoSerializationWrapper относится к: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

Я не совсем уверен, что это относится к Scala, но в Java я решил NotSerializableException на рефакторинг мой код так, что закрытие не открыть несериализуемый

я столкнулся с подобной проблемой, и что я понимаю из ответ Греги и

object NOTworking extends App {
 new testing().doIT
}
//adding extends Serializable wont help
class testing {

val list = List(1,2,3)

val rddList = Spark.ctx.parallelize(list)

def doIT =  {
  //again calling the fucntion someFunc 
  val after = rddList.map(someFunc(_))
  //this will crash (spark lazy)
  after.collect().map(println(_))
}

def someFunc(a:Int) = a+1

}

код doIT метод пытается сериализовать someFunc(_) метод, но поскольку метод не сериализуем, он пытается сериализовать класс тестирование который снова не сериализуется.

чтобы сделать ваш код работать, вы должны определить someFunc внутри doIT метод. Например:

def doIT =  {
 def someFunc(a:Int) = a+1
  //function definition
 }
 val after = rddList.map(someFunc(_))
 after.collect().map(println(_))
}

и если есть несколько функций, входящих в картину, то все эти функции должны быть доступны для родительского контекста.