Lassen Sie uns Ihren Datensatz etwas interessanter machen:
val rdd = sc.parallelize(for {
x <- 1 to 3
y <- 1 to 2
} yield (x, None), 8)
Wir haben sechs Elemente:
rdd.count
Long = 6
kein Partitionierer:
rdd.partitioner
Option[org.apache.spark.Partitioner] = None
und acht Partitionen:
rdd.partitions.length
Int = 8
Definieren wir nun einen kleinen Helfer, um die Anzahl der Elemente pro Partition zu zählen:
import org.apache.spark.rdd.RDD
def countByPartition(rdd: RDD[(Int, None.type)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
Da wir keinen Partitionierer haben, wird unser Dataset gleichmäßig auf Partitionen verteilt ( Standardpartitionierungsschema in Spark ):
countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)
Lassen Sie uns nun unseren Datensatz neu partitionieren:
import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))
Da der übergebene Parameter die HashPartitioner
Anzahl der Partitionen definiert, haben wir eine Partition erwartet:
rddOneP.partitions.length
Int = 1
Da wir nur eine Partition haben, enthält sie alle Elemente:
countByPartition(rddOneP).collect
Array[Int] = Array(6)
Beachten Sie, dass die Reihenfolge der Werte nach dem Mischen nicht deterministisch ist.
Genauso, wenn wir verwenden HashPartitioner(2)
val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
Wir werden 2 Partitionen bekommen:
rddTwoP.partitions.length
Int = 2
Da rdd
durch Schlüsseldaten partitioniert wird, werden diese nicht mehr gleichmäßig verteilt:
countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)
Denn mit drei Schlüsseln und nur zwei verschiedenen hashCode
Mod- Werten numPartitions
gibt es hier nichts Unerwartetes:
(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
Nur um das oben Gesagte zu bestätigen:
rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))
Schließlich erhalten HashPartitioner(7)
wir sieben Partitionen, drei nicht leere mit jeweils 2 Elementen:
val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
Zusammenfassung und Notizen
HashPartitioner
Nimmt ein einzelnes Argument, das die Anzahl der Partitionen definiert
Werte werden Partitionen mit Hilfe hash
von Schlüsseln zugewiesen . hash
Funktion kann in Abhängigkeit von der Sprache unterscheiden (Scala RDD kann verwendet werden hashCode
, DataSets
verwenden MurmurHash 3, PySpark, portable_hash
).
In einem einfachen Fall wie diesem, in dem der Schlüssel eine kleine Ganzzahl ist, können Sie davon ausgehen, dass hash
es sich um eine Identität handelt ( i = hash(i)
).
Die Scala-API verwendet nonNegativeMod
, um die Partition basierend auf dem berechneten Hash zu bestimmen.
Wenn die Verteilung der Schlüssel nicht einheitlich ist, kann dies zu Situationen führen, in denen ein Teil Ihres Clusters inaktiv ist
Schlüssel müssen hashbar sein. Sie können meine Antwort auf eine Liste als Schlüssel für PySparks reductByKey überprüfen, um Informationen zu PySpark-spezifischen Problemen zu erhalten. Ein weiteres mögliches Problem wird in der HashPartitioner-Dokumentation hervorgehoben :
Java-Arrays verfügen über HashCodes, die auf den Identitäten der Arrays und nicht auf deren Inhalt basieren. Wenn Sie also versuchen, ein RDD [Array [ ]] oder RDD [(Array [ ], _)] mit einem HashPartitioner zu partitionieren, wird ein unerwartetes oder falsches Ergebnis erzielt.
In Python 3 müssen Sie sicherstellen, dass das Hashing konsistent ist. Siehe Was bedeutet Ausnahme: Die Zufälligkeit des Hash-Strings sollte über PYTHONHASHSEED im Pyspark deaktiviert werden.
Der Hash-Partitionierer ist weder injektiv noch surjektiv. Einer einzelnen Partition können mehrere Schlüssel zugewiesen werden, und einige Partitionen können leer bleiben.
Bitte beachten Sie, dass derzeit hashbasierte Methoden in Scala nicht funktionieren, wenn sie mit REPL-definierten Fallklassen kombiniert werden ( Gleichheit der Fallklassen in Apache Spark ).
HashPartitioner
(oder eine andere Partitioner
) mischt die Daten. Wenn die Partitionierung nicht zwischen mehreren Vorgängen wiederverwendet wird, wird die zu mischende Datenmenge nicht reduziert.
(1, None)
mithash(2) % P
P Partition ist. Sollte es nicht seinhash(1) % P
?partitionBy
Ich benutze Spark 2.2 und es gibt keine API in rdd. Unter dataframe.write befindet sich eine partitionBy, die Partitioner jedoch nicht als Argument verwendet.RDD
verteilt ist, bedeutet dies, dass es auf einige Teile aufgeteilt ist. Jede dieser Partitionen befindet sich möglicherweise auf einem anderen Computer. Der Hash-Partitionierer mit ArgumentnumPartitions
wählt auf(key, value)
folgende Weise aus, auf welcher Partition das Paar platziert werden soll:numPartitions
Partitionen.(key, value)
in Partition mit NummerHash(key) % numPartitions
quelle
Die
HashPartitioner.getPartition
Methode verwendet einen Schlüssel als Argument und gibt den Index der Partition zurück, zu der der Schlüssel gehört. Der Partitionierer muss die gültigen Indizes kennen, damit er Zahlen im richtigen Bereich zurückgibt. Die Anzahl der Partitionen wird durch dasnumPartitions
Konstruktorargument angegeben .Die Implementierung kehrt ungefähr zurück
key.hashCode() % numPartitions
. Weitere Informationen finden Sie unter Partitioner.scala .quelle