Apache Spark: map vs mapPartitions?

133

Was ist der Unterschied zwischen einem RDD map und einer mapPartitionsMethode? Und verhält flatMapsich wie mapoder wie mapPartitions? Vielen Dank.

(bearbeiten) dh was ist der Unterschied (entweder semantisch oder in Bezug auf die Ausführung) zwischen

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
      preservesPartitioning = true)
  }

Und:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.map(fn)
  }
Nicholas White
quelle
3
Nachdem Sie die folgende Antwort gelesen haben, können Sie sich [diese Erfahrung] ansehen, die von jemandem geteilt wurde, der sie tatsächlich verwendet hat. ( Bzhangusc.wordpress.com/2014/06/19/… ) bzhangusc.wordpress.com/2014/06/19 /…
Abhidemon

Antworten:

121

Was ist der Unterschied zwischen der Karte eines RDD und der mapPartitions-Methode?

Das Verfahren Karte wandelt jedes Element der Quelle RDD in ein einzelnes Element des Ergebnisses RDD durch eine Funktion der Anwendung. mapPartitions konvertiert jede Partition der Quell-RDD in mehrere Elemente des Ergebnisses (möglicherweise keine).

Und verhält sich flatMap wie map oder wie mapPartitions?

Weder, flatMap arbeitet auf einem einzigen Element (wie map) und erzeugt mehrere Elemente des Ergebnisses (wie mapPartitions).

Alexey Romanov
quelle
3
Danke - verursacht die Karte also ein Mischen (oder ändert sie auf andere Weise die Anzahl der Partitionen)? Verschiebt es Daten zwischen Knoten? Ich habe mapPartitions verwendet, um das Verschieben von Daten zwischen Knoten zu vermeiden, war mir aber nicht sicher, ob flapMap dies tun würde.
Nicholas White
Wenn Sie an der Quelle zu finden - github.com/apache/incubator-spark/blob/... und github.com/apache/incubator-spark/blob/... - beide mapund flatMaphaben genau die gleichen Partitionen als Mutter.
Alexey Romanov
13
In einer Präsentation eines Redners auf dem San Francisco Spark Summit 2013 (goo.gl/JZXDCR) wird hervorgehoben, dass Aufgaben mit hohem Overhead pro Datensatz mit einer mapPartition besser abschneiden als mit einer Map-Transformation. Dies ist laut Präsentation auf die hohen Kosten für die Einrichtung einer neuen Aufgabe zurückzuführen.
Mikel Urkia
1
Ich sehe das Gegenteil - selbst bei sehr kleinen Operationen ist es schneller, mapPartitions aufzurufen und zu iterieren als map aufzurufen. Ich gehe davon aus, dass dies nur der Aufwand für das Starten der Sprach-Engine ist, die die Kartenaufgabe verarbeitet. (Ich bin in R, was möglicherweise mehr Startaufwand hat.) Wenn Sie mehrere Vorgänge ausführen würden, scheint mapPartitions etwas schneller zu sein - ich gehe davon aus, dass dies daran liegt, dass das RDD nur einmal gelesen wird. Selbst wenn das RDD im RAM zwischengespeichert wird, spart dies viel Overhead bei der Typkonvertierung.
Bob
3
mapIm Grunde genommen übernimmt Ihre Funktion fund gibt sie weiter iter.map(f). Im Grunde ist es eine bequeme Methode, die einschließt mapPartitions. Es würde mich wundern, wenn es einen Leistungsvorteil für einen reinen Map-Style-Transformationsjob geben würde (dh wenn die Funktion identisch ist). Wenn Sie einige Objekte für die Verarbeitung erstellen müssen und diese Objekte gemeinsam genutzt werden können, mapPartitionswäre dies von Vorteil.
NightWolf
129

Imp. TRINKGELD :

Wenn Sie eine Schwergewichtsinitialisierung haben, die einmal für viele RDDElemente und nicht einmal pro RDDElement durchgeführt werden sollte, und wenn diese Initialisierung, z. B. das Erstellen von Objekten aus einer Bibliothek eines Drittanbieters, nicht serialisiert werden kann (damit Spark sie über den Cluster an übertragen kann) die Arbeiterknoten) verwenden mapPartitions()anstelle von map(). mapPartitions()sieht vor, dass die Initialisierung einmal pro Worker-Task / Thread / Partition statt einmal pro RDDDatenelement durchgeführt wird. Beispiel: siehe unten.

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close() // close dbconnection here
  newPartition.iterator // create a new iterator
})

Q2. verhält flatMapsich wie eine Karte oder wie mapPartitions?

Ja. Bitte sehen Sie Beispiel 2 von flatmap.. es ist selbsterklärend.

Q1. Was ist der Unterschied zwischen einem RDD mapundmapPartitions

mapArbeitet die Funktion, die auf Elementebene verwendet wird, während mapPartitionsdie Funktion auf Partitionsebene ausgeführt wird.

Beispielszenario : Wenn eine bestimmteRDDPartition100.000 Elemente enthält, wird die von der Zuordnungstransformation verwendete Funktion bei Verwendung 100.000 Mal ausgelöstmap.

Wenn wir umgekehrt verwenden, mapPartitionsrufen wir die jeweilige Funktion nur einmal auf, übergeben jedoch alle 100.000 Datensätze und erhalten alle Antworten in einem Funktionsaufruf zurück.

Es wird einen Leistungsgewinn geben, da die mapArbeit an einer bestimmten Funktion so oft ausgeführt wird, insbesondere wenn die Funktion jedes Mal etwas Teueres tut, das nicht erforderlich wäre, wenn wir alle Elemente gleichzeitig übergeben würden (im Fall von mappartitions).

Karte

Wendet eine Transformationsfunktion auf jedes Element der RDD an und gibt das Ergebnis als neue RDD zurück.

Varianten auflisten

def map [U: ClassTag] (f: T => U): RDD [U]

Beispiel:

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
 val b = a.map(_.length)
 val c = a.zip(b)
 c.collect
 res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 

mapPartitions

Dies ist eine spezielle Zuordnung, die für jede Partition nur einmal aufgerufen wird. Der gesamte Inhalt der jeweiligen Partitionen steht über das Eingabeargument (Iterarator [T]) als sequentieller Wertestrom zur Verfügung. Die benutzerdefinierte Funktion muss noch einen weiteren Iterator [U] zurückgeben. Die kombinierten Ergebnisiteratoren werden automatisch in eine neue RDD konvertiert. Bitte beachten Sie, dass die Tupel (3,4) und (6,7) aufgrund der von uns gewählten Partitionierung im folgenden Ergebnis fehlen.

preservesPartitioningGibt an, ob die Eingabefunktion den Partitionierer falsebeibehält. Dies sollte der Fall sein, es sei denn, es handelt sich um ein RDD-Paar, und die Eingabefunktion ändert die Tasten nicht.

Varianten auflisten

def mapPartitions [U: ClassTag] (f: Iterator [T] => Iterator [U], konserviertPartitionierung: Boolean = false): RDD [U]

Beispiel 1

val a = sc.parallelize(1 to 9, 3)
 def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
   var res = List[(T, T)]()
   var pre = iter.next
   while (iter.hasNext)
   {
     val cur = iter.next;
     res .::= (pre, cur)
     pre = cur;
   }
   res.iterator
 }
 a.mapPartitions(myfunc).collect
 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

Beispiel 2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
 def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
   var res = List[Int]()
   while (iter.hasNext) {
     val cur = iter.next;
     res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
   }
   res.iterator
 }
 x.mapPartitions(myfunc).collect
 // some of the number are not outputted at all. This is because the random number generated for it is zero.
 res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 

Das obige Programm kann auch wie folgt mit flatMap geschrieben werden.

Beispiel 2 mit Flatmap

val x  = sc.parallelize(1 to 10, 3)
 x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect

 res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 

Fazit :

mapPartitionsDie Transformation ist schneller als mapda sie Ihre Funktion einmal / Partition aufruft, nicht einmal / Element.

Weiterführende Literatur: foreach Vs foreachPartitions Wann was verwenden?

Ram Ghadiyaram
quelle
4
Ich weiß, dass Sie dasselbe Ergebnis verwenden mapoder mapPartitionserzielen können (siehe die beiden Beispiele in der Frage). In dieser Frage geht es darum, warum Sie einen Weg über den anderen wählen. Die Kommentare in der anderen Antwort sind wirklich nützlich! Außerdem haben Sie das nicht erwähnt mapund flatMapgehen falsezu preservesPartitioningund was die Auswirkungen davon sind.
Nicholas White
2
Die Funktion, die jedes Mal ausgeführt wurde, und die Funktion, die einmal für die Parition ausgeführt wurde, war die Verbindung, die mir fehlte. Der Zugriff auf mehr als einen Datensatz gleichzeitig mit mapPartition ist von unschätzbarem Wert. schätzen die Antwort
Semikolons und Klebeband
1
Gibt es ein Szenario, in dem mapes besser ist als mapPartitions? Wenn mapPartitionses so gut ist, warum ist es nicht die Standardimplementierung der Karte?
Ruhong
1
@oneleggedmule: Beide sind für unterschiedliche Anforderungen gedacht. Wir müssen sie mit Bedacht einsetzen, wenn Sie Ressourcen wie Datenbankverbindungen (wie im obigen Beispiel gezeigt) instanziieren, die teuer sind. Mappartitions ist der richtige Ansatz, da eine Verbindung pro Partition vorhanden ist. auch saveAsTextFile intern verwendete Zuordnungen siehe
Ram Ghadiyaram
@oneleggedmule Aus meiner Sicht ist map () leichter zu verstehen und zu lernen und es ist auch eine gängige Methode für viele verschiedene Sprachen. Es ist möglicherweise auch einfacher zu verwenden als mapPartitions (), wenn jemand zu Beginn nicht mit dieser Spark-spezifischen Methode vertraut ist. Wenn es keinen Leistungsunterschied gibt, bevorzuge ich die Verwendung von map ().
Raymond Chen
15

Karte :

  1. Es verarbeitet jeweils eine Zeile, sehr ähnlich der map () -Methode von MapReduce.
  2. Sie kehren nach jeder Zeile von der Transformation zurück.

MapPartitions

  1. Es verarbeitet die gesamte Partition auf einmal.
  2. Sie können nach der Verarbeitung der gesamten Partition nur einmal von der Funktion zurückkehren.
  3. Alle Zwischenergebnisse müssen gespeichert werden, bis Sie die gesamte Partition verarbeiten.
  4. Bietet Ihnen die Funktion setup () map () und cleanup () von MapReduce

Map Vs mapPartitions http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Map http://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions http://bytepadding.com/big-data/spark/spark-mappartitions/

KrazyGautam
quelle
in Bezug auf 2 - Wenn Sie Iterator-zu-Iterator-Transformationen durchführen und den Iterator nicht zu einer Sammlung einer Art materialisieren, müssen Sie nicht die gesamte Partition im Speicher halten, sondern auf diese Weise kann spark Teile der Partition auf die Festplatte verschütten.
Ilcord
4
Sie müssen nicht die gesamte Partition im Speicher halten, sondern das Ergebnis. Sie können das Ergebnis nicht zurückgeben, bis Sie die gesamte Partition verarbeitet haben
KrazyGautam