Apache Spark: map vs mapPartitions?
в чем разница между РДД этоmap и mapPartitions способ? И делает flatMap ведут себя как map или как mapPartitions? Спасибо.
(редактировать) т. е. в чем разница (семантически или с точки зрения исполнения) между
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
preservesPartitioning = true)
}
и:
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.map(fn)
}
3 ответа:
в чем разница между картой RDD и методом mapPartitions?
метод карта преобразует каждый элемент элемент исходного RDD в один элемент результата RDD путем применения функции. mapPartitions преобразует каждый элемент раздел исходного RDD на несколько элементов результата (возможно, нет).
и flatMap ведет себя как карта или как mapPartitions?
ни flatMap работает на одном элементе (как
map) и производит несколько элементов результата (какmapPartitions).
чертенок. Совет:
всякий раз, когда у вас есть тяжеловес инициализации, что должно быть сделано один раз для многих
RDDэлементов, а не один раз вRDDэлемент, и если этот инициализация, например создание объектов из стороннего источника библиотека, не может быть сериализована (так что Spark может передавать его через кластер к рабочим узлам), используйтеmapPartitions()вместоmap().mapPartitions()обеспечивает выполнение инициализации один раз на одного работника задача / поток / раздел вместо одного раза наRDDсведения элемент для пример : см. ниже.val newRd = myRdd.mapPartitions(partition => { val connection = new DbConnection /*creates a db connection per partition*/ val newPartition = partition.map(record => { readMatchingFromDB(record, connection) }).toList // consumes the iterator, thus calls readMatchingFromDB connection.close() // close dbconnection here newPartition.iterator // create a new iterator })
Q2. тут
flatMapведите себя как карта или какmapPartitions?да. пожалуйста, см. Пример 2
flatmap.. это само собой разумеется.Q1. какая разница между РДД это
mapиmapPartitions
mapработает функция используется на уровне каждого элемента в то время какmapPartitionsвыполняет функцию на уровне раздела.Пример: если у нас есть 100K элементов в частности
RDDраздел, то мы будем стрелять от функции, используемой преобразования отображения 100K раз, когда мы используемmap.и наоборот, если мы используем
mapPartitionsзатем мы вызовет только определенную функцию один раз, но мы передадим все записи 100K и вернем все ответы в одном вызове функции.будет увеличение производительности с
mapработает над определенной функцией так много раз, особенно если функция делает что-то дорогое каждый раз, что ей не нужно было бы делать, если бы мы передали все элементы сразу(в случаеmappartitions).карта
применяет a функция преобразования для каждого элемента RDD и возвращает результат в виде нового RDD.
Список Вариантов
def map[U: ClassTag] (f: T = > U): RDD [U]
пример :
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.map(_.length) val c = a.zip(b) c.collect res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))mapPartitions
это специализированная карта, которая вызывается только один раз для каждого раздела. Все содержимое соответствующих разделов доступно в виде последовательный поток значений через входной аргумент (итератор[T]). Пользовательская функция должна возвращать еще один итератор[U]. Комбинированный итераторы результатов автоматически преобразуются в новый RDD. Пожалуйста обратите внимание, что кортежи (3,4) и (6,7) отсутствуют следующие результат из-за разделения мы выбрали.
preservesPartitioningуказывает, сохраняет ли функция ввода разделитель, который должен бытьfalseесли это не пара RDD и вход функция не изменяет ключи.Список Вариантов
def mapPartitions[U: ClassTag] (f: Iterator[T] = > Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
Пример 1
val a = sc.parallelize(1 to 9, 3) def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { var res = List[(T, T)]() var pre = iter.next while (iter.hasNext) { val cur = iter.next; res .::= (pre, cur) pre = cur; } res.iterator } a.mapPartitions(myfunc).collect res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))Пример 2
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3) def myfunc(iter: Iterator[Int]) : Iterator[Int] = { var res = List[Int]() while (iter.hasNext) { val cur = iter.next; res = res ::: List.fill(scala.util.Random.nextInt(10))(cur) } res.iterator } x.mapPartitions(myfunc).collect // some of the number are not outputted at all. This is because the random number generated for it is zero. res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)вышеуказанная программа также может быть написана с помощью flatMap следующим образом.
Пример 2 с помощью flatmap
val x = sc.parallelize(1 to 10, 3) x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)вывод :
mapPartitionsтрансформация происходит быстрее, чемmapтак как он вызывает вашу функцию один раз/раздел, а не один раз / элемент..
карта:
- он обрабатывает по одной строке за раз, очень похож на метод map() MapReduce.
- вы возвращаетесь из преобразования после каждой строки.
MapPartitions
- он обрабатывает весь раздел за один раз.
- вы можете вернуться из функции только один раз после обработки всего раздела.
- все промежуточные результаты должны храниться в памяти до тех пор, пока вы не обработаете весь раздел.
- предоставляет вам как setup () map() и cleanup () функция MapReduce
Map Vs mapPartitionshttp://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/
Spark Maphttp://bytepadding.com/big-data/spark/spark-map/
Spark mapPartitionshttp://bytepadding.com/big-data/spark/spark-mappartitions/