Apache Spark: map vs mapPartitions?
в чем разница между РДД этоmap
и mapPartitions
способ? И делает flatMap
ведут себя как map
или как mapPartitions
? Спасибо.
(редактировать) т. е. в чем разница (семантически или с точки зрения исполнения) между
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
preservesPartitioning = true)
}
и:
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.map(fn)
}
3 ответа:
в чем разница между картой RDD и методом mapPartitions?
метод карта преобразует каждый элемент элемент исходного RDD в один элемент результата RDD путем применения функции. mapPartitions преобразует каждый элемент раздел исходного RDD на несколько элементов результата (возможно, нет).
и flatMap ведет себя как карта или как mapPartitions?
ни flatMap работает на одном элементе (как
map
) и производит несколько элементов результата (какmapPartitions
).
чертенок. Совет:
всякий раз, когда у вас есть тяжеловес инициализации, что должно быть сделано один раз для многих
RDD
элементов, а не один раз вRDD
элемент, и если этот инициализация, например создание объектов из стороннего источника библиотека, не может быть сериализована (так что Spark может передавать его через кластер к рабочим узлам), используйтеmapPartitions()
вместоmap()
.mapPartitions()
обеспечивает выполнение инициализации один раз на одного работника задача / поток / раздел вместо одного раза наRDD
сведения элемент для пример : см. ниже.val newRd = myRdd.mapPartitions(partition => { val connection = new DbConnection /*creates a db connection per partition*/ val newPartition = partition.map(record => { readMatchingFromDB(record, connection) }).toList // consumes the iterator, thus calls readMatchingFromDB connection.close() // close dbconnection here newPartition.iterator // create a new iterator })
Q2. тут
flatMap
ведите себя как карта или какmapPartitions
?да. пожалуйста, см. Пример 2
flatmap
.. это само собой разумеется.Q1. какая разница между РДД это
map
иmapPartitions
map
работает функция используется на уровне каждого элемента в то время какmapPartitions
выполняет функцию на уровне раздела.Пример: если у нас есть 100K элементов в частности
RDD
раздел, то мы будем стрелять от функции, используемой преобразования отображения 100K раз, когда мы используемmap
.и наоборот, если мы используем
mapPartitions
затем мы вызовет только определенную функцию один раз, но мы передадим все записи 100K и вернем все ответы в одном вызове функции.будет увеличение производительности с
map
работает над определенной функцией так много раз, особенно если функция делает что-то дорогое каждый раз, что ей не нужно было бы делать, если бы мы передали все элементы сразу(в случаеmappartitions
).карта
применяет a функция преобразования для каждого элемента RDD и возвращает результат в виде нового RDD.
Список Вариантов
def map[U: ClassTag] (f: T = > U): RDD [U]
пример :
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.map(_.length) val c = a.zip(b) c.collect res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
mapPartitions
это специализированная карта, которая вызывается только один раз для каждого раздела. Все содержимое соответствующих разделов доступно в виде последовательный поток значений через входной аргумент (итератор[T]). Пользовательская функция должна возвращать еще один итератор[U]. Комбинированный итераторы результатов автоматически преобразуются в новый RDD. Пожалуйста обратите внимание, что кортежи (3,4) и (6,7) отсутствуют следующие результат из-за разделения мы выбрали.
preservesPartitioning
указывает, сохраняет ли функция ввода разделитель, который должен бытьfalse
если это не пара RDD и вход функция не изменяет ключи.Список Вариантов
def mapPartitions[U: ClassTag] (f: Iterator[T] = > Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
Пример 1
val a = sc.parallelize(1 to 9, 3) def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { var res = List[(T, T)]() var pre = iter.next while (iter.hasNext) { val cur = iter.next; res .::= (pre, cur) pre = cur; } res.iterator } a.mapPartitions(myfunc).collect res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
Пример 2
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3) def myfunc(iter: Iterator[Int]) : Iterator[Int] = { var res = List[Int]() while (iter.hasNext) { val cur = iter.next; res = res ::: List.fill(scala.util.Random.nextInt(10))(cur) } res.iterator } x.mapPartitions(myfunc).collect // some of the number are not outputted at all. This is because the random number generated for it is zero. res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)
вышеуказанная программа также может быть написана с помощью flatMap следующим образом.
Пример 2 с помощью flatmap
val x = sc.parallelize(1 to 10, 3) x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
вывод :
mapPartitions
трансформация происходит быстрее, чемmap
так как он вызывает вашу функцию один раз/раздел, а не один раз / элемент..
карта:
- он обрабатывает по одной строке за раз, очень похож на метод map() MapReduce.
- вы возвращаетесь из преобразования после каждой строки.
MapPartitions
- он обрабатывает весь раздел за один раз.
- вы можете вернуться из функции только один раз после обработки всего раздела.
- все промежуточные результаты должны храниться в памяти до тех пор, пока вы не обработаете весь раздел.
- предоставляет вам как setup () map() и cleanup () функция MapReduce
Map Vs mapPartitions
http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/
Spark Map
http://bytepadding.com/big-data/spark/spark-map/
Spark mapPartitions
http://bytepadding.com/big-data/spark/spark-mappartitions/