Ich habe einen großen Datensatz, den ich nach bestimmten Parametern in Gruppen aufteilen muss. Ich möchte, dass der Job so effizient wie möglich bearbeitet wird. Ich kann mir zwei Möglichkeiten vorstellen, dies zu tun
Option 1 - Karte aus Original-RDD erstellen und filtern
def customMapper(record):
if passesSomeTest(record):
return (1,record)
else:
return (0,record)
mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()
Option 2 - Original-RDD direkt filtern
def customFilter(record):
return passesSomeTest(record)
rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()
Die Faustmethode muss dreimal über alle Datensätze des Originaldatensatzes iterieren, wobei die zweite Methode dies nur zweimal tun muss. Unter normalen Umständen führt der Funke jedoch einige Grafiken hinter den Kulissen aus, sodass ich mir vorstellen kann, dass dies der Fall ist effektiv auf die gleiche Weise getan. Meine Fragen sind: a.) Ist eine Methode effizienter als die andere oder macht das Erstellen von Funkengraphen sie gleichwertig? B.) Ist es möglich, diese Aufteilung in einem einzigen Durchgang durchzuführen?
quelle
Antworten:
Lassen Sie mich zunächst sagen, dass ich kein Spark-Experte bin. Ich habe es in den letzten Monaten ziemlich oft benutzt und ich glaube, ich verstehe es jetzt, aber ich kann mich irren.
Beantworten Sie also Ihre Fragen:
a.) sie sind gleichwertig, aber nicht so, wie Sie es sehen; Spark optimiert das Diagramm nicht, wenn Sie sich fragen, aber das
customMapper
wird in beiden Fällen immer noch zweimal ausgeführt. Dies ist aufgrund der Tatsache , dass für die Funken,rdd1
undrdd2
sind zwei völlig verschiedene RDDs, und es wird die Transformationsgraphen von unten nach oben , ausgehend von den Blättern bauen; Option 1 bedeutet also:Wie Sie sagten,
customMapper
wird zweimal ausgeführt (außerdemrddIn
wird es auch zweimal gelesen, was bedeutet, dass es, wenn es aus einer Datenbank stammt, möglicherweise noch langsamer ist).b.) Es gibt einen Weg, Sie müssen sich nur
cache()
an der richtigen Stelle bewegen :Auf diese Weise sagen wir Funken, dass es die Teilergebnisse von speichern kann
mappedRdd
; Diese Teilergebnisse werden dann sowohl fürrdd1
als auch verwendetrdd2
. Aus Sicht des Funkens entspricht dies:quelle