Задача не сериализуемая: java. io. NotSerializableException при вызове функции вне закрытия только на классы не объекты
получение странного поведения при вызове функции вне закрытия:
- когда функция находится в объекте все работает
- когда функция находится в классе вам :
задача не сериализуемая: java.io. NotSerializableException: testing
проблема в том, что мне нужен мой код в классе, а не объект. Есть идеи, почему это происходит? Сериализуется ли объект Scala (по умолчанию?)?
Это пример рабочего кода:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
это нерабочий пример:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
6 ответов:
Я не думаю, что другой ответ совершенно правильный. RDDs действительно сериализуемы, так что это не то, что вызывает вашу задачу не получится.
Искра распределенной вычислительной системы и ее основных абстракция представляет собой упругий распределенных наборов данных (RDD), которое можно рассматривать как совокупность. В основном, элементы RDD разделены по узлам кластера, но Spark абстрагирует это от пользователя, позволяя пользователю взаимодействуйте с RDD (коллекцией), как если бы она была локальной.
не вдаваться в слишком много деталей, но при запуске различных преобразований на RDD (
map
,flatMap
,filter
и другие), ваш код преобразования (закрытие):
- сериализовано на узле драйвера,
- отправлено на соответствующие узлы кластера,
- десериализации
- и, наконец, выполняется на узлах
вы можете конечно, запустите это локально (как в вашем примере), но все эти фазы (кроме доставки по сети) все еще происходят. [Это позволяет поймать любые ошибки еще до развертывания в производство]
что происходит во втором случае, так это то, что вы вызываете метод, определенный в классе
testing
внутри функции map. Spark видит, что и поскольку методы не могут быть сериализованы самостоятельно, Spark пытается сериализовать весьtesting
класс, так что код будет работать при выполнении в другом JVM. У вас есть две возможности:либо вы делаете тестирование класса сериализуемым, поэтому весь класс может быть сериализован Spark:
import org.apache.spark.{SparkContext,SparkConf} object Spark { val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]")) } object NOTworking extends App { new Test().doIT } class Test extends java.io.Serializable { val rddList = Spark.ctx.parallelize(List(1,2,3)) def doIT() = { val after = rddList.map(someFunc) after.collect().foreach(println) } def someFunc(a: Int) = a + 1 }
или вы делаете
someFunc
функция вместо метода (функции-это объекты в Scala), так что Spark сможет сериализовать его:import org.apache.spark.{SparkContext,SparkConf} object Spark { val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]")) } object NOTworking extends App { new Test().doIT } class Test { val rddList = Spark.ctx.parallelize(List(1,2,3)) def doIT() = { val after = rddList.map(someFunc) after.collect().foreach(println) } val someFunc = (a: Int) => a + 1 }
аналогичная, но не та же проблема с сериализацией классов может представлять интерес для вас, и вы можете прочитать об этом в этой Искре саммита 2013 презентация.
в качестве примечания, вы можете переписать
rddList.map(someFunc(_))
доrddList.map(someFunc)
, они точно такие же. Как правило, второй предпочтительнее, так как он менее многословен и чище для чтения.EDIT (2015-03-15):Искра-5307 ввел SerializationDebugger и Spark 1.3.0 является первой версией, чтобы использовать его. Он добавляет путь сериализации к NotSerializableException. Когда NotSerializableException встречается, то отладчик посещает граф объектов, чтобы найти путь к объекту, который не может быть сериализован, и создает информацию, чтобы помочь пользователю найти объект.
в случае OP это то, что печатается в stdout:
Serialization stack: - object not serializable (class: testing, value: testing@2dfe2f00) - field (class: testing$$anonfun, name: $outer, type: class testing) - object (class testing$$anonfun, <function1>)
Грега это в объяснении, почему исходный код не работает и два способа исправить эту проблему. Однако это решение не очень гибко; рассмотрим случай, когда ваше закрытие включает вызов метода на не-
Serializable
класс, который вы не можете контролировать. Вы не можете ни добавитьSerializable
тег к этому классу, ни изменить базовую реализацию, чтобы изменить метод в функцию.Nilesh представляет собой отличное решение для это, но решение можно сделать как более кратким, так и общим:
def genMapper[A, B](f: A => B): A => B = { val locker = com.twitter.chill.MeatLocker(f) x => locker.get.apply(x) }
эта функция-сериализатор может быть использован для автоматического обертывания закрытия и вызовов методов:
rdd map genMapper(someFunc)
этот метод также имеет то преимущество, что не требует дополнительных зависимостей Shark для доступа
KryoSerializationWrapper
, так как холод Twitter уже втянут основной искрой
полный разговор полностью объясняет проблему, которая предлагает отличный способ смены парадигмы, чтобы избежать этих проблем сериализации: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-leaks-no-ws.md
главный голосованный ответ в основном предлагает выбросить всю языковую функцию-то есть больше не использовать методы и использовать только функции. Действительно в функциональном программировании методы в классах должны быть избежать, но превращение их в функции не решает проблему дизайна здесь (см. ссылку выше).
в качестве быстрого решения в этой конкретной ситуации вы можете просто использовать
@transient
аннотация, чтобы сказать ему не пытаться сериализовать оскорбительное значение (здесь,Spark.ctx
является пользовательским классом, а не одним из следующих имен OP):@transient val rddList = Spark.ctx.parallelize(list)
вы также можете реструктурировать код так, что rddList живет где-то еще, но это тоже неприятно.
будущее, вероятно Споры
в будущем Scala будет включать в себя эти вещи, называемые "спорами", которые должны позволить нам контролировать мелкое зерно, что делает и не совсем втягивается закрытием. Кроме того, это должно превратить все ошибки случайного вытягивания несериализуемых типов (или любых нежелательных значений) в ошибки компиляции, а не сейчас, что является ужасными исключениями времени выполнения / утечками памяти.
http://docs.scala-lang.org/sips/pending/spores.html
A совет по сериализации Kryo
при использовании kyro сделайте так, чтобы регистрация была необходима, это будет означать, что вы получаете ошибки вместо утечек памяти:
наконец, я знаю, что у крио есть крио.setRegistrationOptional (true) но у меня очень трудное время, пытаясь выяснить, как его использовать. Когда эта опция включена, kryo все еще, кажется, бросает исключения, если я не зарегистрировал классы."
стратегия регистрации классов с помощью kryo
конечно, это дает вам только контроль уровня типа, а не контроль уровня значения.
... еще больше идей впереди.
Я решил эту проблему, используя другой подход. Вам просто нужно сериализовать объекты перед прохождением через закрытие,а затем де-сериализовать. Этот подход просто работает, даже если ваши классы не Сериализуемы, потому что он использует Kryo за кулисами. Все, что вам нужно, это немного карри. ;)
вот пример того, как я это сделал:
def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)]) (foo: Foo) : Bar = { kryoWrapper.value.apply(foo) } val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _ rdd.flatMap(mapper).collectAsMap() object Blah(abc: ABC) extends (Foo => Bar) { def apply(foo: Foo) : Bar = { //This is the real function } }
Не стесняйтесь делать мля так сложно, как вы хотите, класс, объект, вложенные классы, ссылки на несколько 3-й партии библиотеки.
KryoSerializationWrapper относится к: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
Я не совсем уверен, что это относится к Scala, но в Java я решил
NotSerializableException
на рефакторинг мой код так, что закрытие не открыть несериализуемый
я столкнулся с подобной проблемой, и что я понимаю из ответ Греги и
object NOTworking extends App { new testing().doIT } //adding extends Serializable wont help class testing { val list = List(1,2,3) val rddList = Spark.ctx.parallelize(list) def doIT = { //again calling the fucntion someFunc val after = rddList.map(someFunc(_)) //this will crash (spark lazy) after.collect().map(println(_)) } def someFunc(a:Int) = a+1 }
код doIT метод пытается сериализовать someFunc(_) метод, но поскольку метод не сериализуем, он пытается сериализовать класс тестирование который снова не сериализуется.
чтобы сделать ваш код работать, вы должны определить someFunc внутри doIT метод. Например:
def doIT = { def someFunc(a:Int) = a+1 //function definition } val after = rddList.map(someFunc(_)) after.collect().map(println(_)) }
и если есть несколько функций, входящих в картину, то все эти функции должны быть доступны для родительского контекста.