Wie funktioniert HashPartitioner?

82

Ich habe die Dokumentation von gelesen HashPartitioner. Leider wurde außer den API-Aufrufen nicht viel erklärt. Ich gehe davon aus, dass HashPartitionerdie verteilte Menge basierend auf dem Hash der Schlüssel partitioniert wird. Zum Beispiel, wenn meine Daten wie sind

(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)

Der Partitionierer würde dies also in verschiedene Partitionen einteilen, wobei dieselben Schlüssel in dieselbe Partition fallen. Ich verstehe jedoch die Bedeutung des Konstruktorarguments nicht

new HashPartitoner(numPartitions) //What does numPartitions do?

Wie würden sich die Ergebnisse für den obigen Datensatz unterscheiden, wenn ich dies tun würde?

new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)

Wie funktioniert das HashPartitionereigentlich?

Sohaib
quelle

Antworten:

161

Lassen Sie uns Ihren Datensatz etwas interessanter machen:

val rdd = sc.parallelize(for {
    x <- 1 to 3
    y <- 1 to 2
} yield (x, None), 8)

Wir haben sechs Elemente:

rdd.count
Long = 6

kein Partitionierer:

rdd.partitioner
Option[org.apache.spark.Partitioner] = None

und acht Partitionen:

rdd.partitions.length
Int = 8

Definieren wir nun einen kleinen Helfer, um die Anzahl der Elemente pro Partition zu zählen:

import org.apache.spark.rdd.RDD

def countByPartition(rdd: RDD[(Int, None.type)]) = {
    rdd.mapPartitions(iter => Iterator(iter.length))
}

Da wir keinen Partitionierer haben, wird unser Dataset gleichmäßig auf Partitionen verteilt ( Standardpartitionierungsschema in Spark ):

countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)

Anfangsverteilung

Lassen Sie uns nun unseren Datensatz neu partitionieren:

import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))

Da der übergebene Parameter die HashPartitionerAnzahl der Partitionen definiert, haben wir eine Partition erwartet:

rddOneP.partitions.length
Int = 1

Da wir nur eine Partition haben, enthält sie alle Elemente:

countByPartition(rddOneP).collect
Array[Int] = Array(6)

Hash-Partitionierer-1

Beachten Sie, dass die Reihenfolge der Werte nach dem Mischen nicht deterministisch ist.

Genauso, wenn wir verwenden HashPartitioner(2)

val rddTwoP = rdd.partitionBy(new HashPartitioner(2))

Wir werden 2 Partitionen bekommen:

rddTwoP.partitions.length
Int = 2

Da rdddurch Schlüsseldaten partitioniert wird, werden diese nicht mehr gleichmäßig verteilt:

countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)

Denn mit drei Schlüsseln und nur zwei verschiedenen hashCodeMod- Werten numPartitionsgibt es hier nichts Unerwartetes:

(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))

Nur um das oben Gesagte zu bestätigen:

rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))

Hash-Partitionierer-2

Schließlich erhalten HashPartitioner(7)wir sieben Partitionen, drei nicht leere mit jeweils 2 Elementen:

val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)

Hash-Partitionierer-7

Zusammenfassung und Notizen

  • HashPartitioner Nimmt ein einzelnes Argument, das die Anzahl der Partitionen definiert
  • Werte werden Partitionen mit Hilfe hashvon Schlüsseln zugewiesen . hashFunktion kann in Abhängigkeit von der Sprache unterscheiden (Scala RDD kann verwendet werden hashCode, DataSetsverwenden MurmurHash 3, PySpark, portable_hash).

    In einem einfachen Fall wie diesem, in dem der Schlüssel eine kleine Ganzzahl ist, können Sie davon ausgehen, dass hashes sich um eine Identität handelt ( i = hash(i)).

    Die Scala-API verwendet nonNegativeMod, um die Partition basierend auf dem berechneten Hash zu bestimmen.

  • Wenn die Verteilung der Schlüssel nicht einheitlich ist, kann dies zu Situationen führen, in denen ein Teil Ihres Clusters inaktiv ist

  • Schlüssel müssen hashbar sein. Sie können meine Antwort auf eine Liste als Schlüssel für PySparks reductByKey überprüfen, um Informationen zu PySpark-spezifischen Problemen zu erhalten. Ein weiteres mögliches Problem wird in der HashPartitioner-Dokumentation hervorgehoben :

    Java-Arrays verfügen über HashCodes, die auf den Identitäten der Arrays und nicht auf deren Inhalt basieren. Wenn Sie also versuchen, ein RDD [Array [ ]] oder RDD [(Array [ ], _)] mit einem HashPartitioner zu partitionieren, wird ein unerwartetes oder falsches Ergebnis erzielt.

  • In Python 3 müssen Sie sicherstellen, dass das Hashing konsistent ist. Siehe Was bedeutet Ausnahme: Die Zufälligkeit des Hash-Strings sollte über PYTHONHASHSEED im Pyspark deaktiviert werden.

  • Der Hash-Partitionierer ist weder injektiv noch surjektiv. Einer einzelnen Partition können mehrere Schlüssel zugewiesen werden, und einige Partitionen können leer bleiben.

  • Bitte beachten Sie, dass derzeit hashbasierte Methoden in Scala nicht funktionieren, wenn sie mit REPL-definierten Fallklassen kombiniert werden ( Gleichheit der Fallklassen in Apache Spark ).

  • HashPartitioner(oder eine andere Partitioner) mischt die Daten. Wenn die Partitionierung nicht zwischen mehreren Vorgängen wiederverwendet wird, wird die zu mischende Datenmenge nicht reduziert.

null323
quelle
Ausgezeichnetes Schreiben, danke. Allerdings habe ich in den Bildern bemerkt, dass Sie (1, None)mit hash(2) % PP Partition ist. Sollte es nicht sein hash(1) % P?
javamonkey79
partitionByIch benutze Spark 2.2 und es gibt keine API in rdd. Unter dataframe.write befindet sich eine partitionBy, die Partitioner jedoch nicht als Argument verwendet.
Hakamami
Eine großartige Antwort ... Shuffle basiert auf Partionierer. Ein guter Partionierer kann die gemischte Datenmenge reduzieren.
Leon
Großartige Antwort. Ich habe eine Frage, auf die ich im Internet keine solide Antwort bekomme. Wenn wir df.repartition (n) verwenden, dh wenn wir keine Spalte als Schlüssel angeben, wie funktioniert der Hash dann intern, da nichts zu hashen ist?
dsk
@dsk, wenn Sie keinen Schlüssel angeben, verwendet Repartition meiner Meinung nach RoundRobinPartitioning. Einige Diskussionen hier .
Mike Souder
6

RDDverteilt ist, bedeutet dies, dass es auf einige Teile aufgeteilt ist. Jede dieser Partitionen befindet sich möglicherweise auf einem anderen Computer. Der Hash-Partitionierer mit Argument numPartitionswählt auf (key, value)folgende Weise aus, auf welcher Partition das Paar platziert werden soll:

  1. Erstellt genau numPartitionsPartitionen.
  2. Orte (key, value)in Partition mit NummerHash(key) % numPartitions
abalcerek
quelle
3

Die HashPartitioner.getPartitionMethode verwendet einen Schlüssel als Argument und gibt den Index der Partition zurück, zu der der Schlüssel gehört. Der Partitionierer muss die gültigen Indizes kennen, damit er Zahlen im richtigen Bereich zurückgibt. Die Anzahl der Partitionen wird durch das numPartitionsKonstruktorargument angegeben .

Die Implementierung kehrt ungefähr zurück key.hashCode() % numPartitions. Weitere Informationen finden Sie unter Partitioner.scala .

Daniel Darabos
quelle