Apache Spark: map vs mapPartitions?


в чем разница между РДД этоmap и mapPartitions способ? И делает flatMap ведут себя как map или как mapPartitions? Спасибо.

(редактировать) т. е. в чем разница (семантически или с точки зрения исполнения) между

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
      preservesPartitioning = true)
  }

и:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.map(fn)
  }
3 98

3 ответа:

в чем разница между картой RDD и методом mapPartitions?

метод карта преобразует каждый элемент элемент исходного RDD в один элемент результата RDD путем применения функции. mapPartitions преобразует каждый элемент раздел исходного RDD на несколько элементов результата (возможно, нет).

и flatMap ведет себя как карта или как mapPartitions?

ни flatMap работает на одном элементе (как map) и производит несколько элементов результата (как mapPartitions).

чертенок. Совет:

всякий раз, когда у вас есть тяжеловес инициализации, что должно быть сделано один раз для многих RDD элементов, а не один раз в RDD элемент, и если этот инициализация, например создание объектов из стороннего источника библиотека, не может быть сериализована (так что Spark может передавать его через кластер к рабочим узлам), используйте mapPartitions() вместо map(). mapPartitions() обеспечивает выполнение инициализации один раз на одного работника задача / поток / раздел вместо одного раза на RDD сведения элемент для пример : см. ниже.

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close() // close dbconnection here
  newPartition.iterator // create a new iterator
})

Q2. тут flatMap ведите себя как карта или как mapPartitions?

да. пожалуйста, см. Пример 2 flatmap.. это само собой разумеется.

Q1. какая разница между РДД это map и mapPartitions

map работает функция используется на уровне каждого элемента в то время как mapPartitions выполняет функцию на уровне раздела.

Пример: если у нас есть 100K элементов в частности RDD раздел, то мы будем стрелять от функции, используемой преобразования отображения 100K раз, когда мы используем map.

и наоборот, если мы используем mapPartitions затем мы вызовет только определенную функцию один раз, но мы передадим все записи 100K и вернем все ответы в одном вызове функции.

будет увеличение производительности с map работает над определенной функцией так много раз, особенно если функция делает что-то дорогое каждый раз, что ей не нужно было бы делать, если бы мы передали все элементы сразу(в случае mappartitions).

карта

применяет a функция преобразования для каждого элемента RDD и возвращает результат в виде нового RDD.

Список Вариантов

def map[U: ClassTag] (f: T = > U): RDD [U]

пример :

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
 val b = a.map(_.length)
 val c = a.zip(b)
 c.collect
 res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 

mapPartitions

это специализированная карта, которая вызывается только один раз для каждого раздела. Все содержимое соответствующих разделов доступно в виде последовательный поток значений через входной аргумент (итератор[T]). Пользовательская функция должна возвращать еще один итератор[U]. Комбинированный итераторы результатов автоматически преобразуются в новый RDD. Пожалуйста обратите внимание, что кортежи (3,4) и (6,7) отсутствуют следующие результат из-за разделения мы выбрали.

preservesPartitioning указывает, сохраняет ли функция ввода разделитель, который должен быть false если это не пара RDD и вход функция не изменяет ключи.

Список Вариантов

def mapPartitions[U: ClassTag] (f: Iterator[T] = > Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

Пример 1

val a = sc.parallelize(1 to 9, 3)
 def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
   var res = List[(T, T)]()
   var pre = iter.next
   while (iter.hasNext)
   {
     val cur = iter.next;
     res .::= (pre, cur)
     pre = cur;
   }
   res.iterator
 }
 a.mapPartitions(myfunc).collect
 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

Пример 2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
 def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
   var res = List[Int]()
   while (iter.hasNext) {
     val cur = iter.next;
     res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
   }
   res.iterator
 }
 x.mapPartitions(myfunc).collect
 // some of the number are not outputted at all. This is because the random number generated for it is zero.
 res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 

вышеуказанная программа также может быть написана с помощью flatMap следующим образом.

Пример 2 с помощью flatmap

val x  = sc.parallelize(1 to 10, 3)
 x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect

 res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 

вывод :

mapPartitions трансформация происходит быстрее, чем map так как он вызывает вашу функцию один раз/раздел, а не один раз / элемент..

карта:

  1. он обрабатывает по одной строке за раз, очень похож на метод map() MapReduce.
  2. вы возвращаетесь из преобразования после каждой строки.

MapPartitions

  1. он обрабатывает весь раздел за один раз.
  2. вы можете вернуться из функции только один раз после обработки всего раздела.
  3. все промежуточные результаты должны храниться в памяти до тех пор, пока вы не обработаете весь раздел.
  4. предоставляет вам как setup () map() и cleanup () функция MapReduce

Map Vs mapPartitions http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Map http://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions http://bytepadding.com/big-data/spark/spark-mappartitions/