Funken, der eine einzelne RDD optimal in zwei aufteilt

10

Ich habe einen großen Datensatz, den ich nach bestimmten Parametern in Gruppen aufteilen muss. Ich möchte, dass der Job so effizient wie möglich bearbeitet wird. Ich kann mir zwei Möglichkeiten vorstellen, dies zu tun

Option 1 - Karte aus Original-RDD erstellen und filtern

def customMapper(record):
    if passesSomeTest(record):
        return (1,record)
    else:
        return (0,record)

mappedRdd = rddIn.map(lambda x: customMapper(x))
rdd0 = mappedRdd.filter(lambda x: x[0]==0).cache()
rdd1 = mappedRdd.filter(lambda x: x[1]==1).cache()

Option 2 - Original-RDD direkt filtern

def customFilter(record):
    return passesSomeTest(record)

rdd0 = rddIn.filter(lambda x: customFilter(x)==False).cache()
rdd1 = rddIn.filter(customFilter).cache()

Die Faustmethode muss dreimal über alle Datensätze des Originaldatensatzes iterieren, wobei die zweite Methode dies nur zweimal tun muss. Unter normalen Umständen führt der Funke jedoch einige Grafiken hinter den Kulissen aus, sodass ich mir vorstellen kann, dass dies der Fall ist effektiv auf die gleiche Weise getan. Meine Fragen sind: a.) Ist eine Methode effizienter als die andere oder macht das Erstellen von Funkengraphen sie gleichwertig? B.) Ist es möglich, diese Aufteilung in einem einzigen Durchgang durchzuführen?

Jagartner
quelle
Ich fand mich auch mit einem sehr ähnlichen Problem wieder und fand keine wirkliche Lösung. Was tatsächlich passiert, ist aus diesem Code nicht ersichtlich, da Spark eine "träge Auswertung" hat und angeblich nur das ausführen kann, was es wirklich ausführen muss, und auch Karten, Filter und alles, was zusammen gemacht werden kann, kombinieren kann. Möglicherweise geschieht das, was Sie beschreiben , in einem einzigen Durchgang. Nicht vertraut genug mit den faulen Bewertungsmechanismen, um es zu sagen. Eigentlich habe ich gerade den .cache () bemerkt. Vielleicht gibt es eine Möglichkeit, nur einen .cache () zu erstellen und die vollständigen Ergebnisse zu erhalten?
user3780968

Antworten:

9

Lassen Sie mich zunächst sagen, dass ich kein Spark-Experte bin. Ich habe es in den letzten Monaten ziemlich oft benutzt und ich glaube, ich verstehe es jetzt, aber ich kann mich irren.

Beantworten Sie also Ihre Fragen:

a.) sie sind gleichwertig, aber nicht so, wie Sie es sehen; Spark optimiert das Diagramm nicht, wenn Sie sich fragen, aber das customMapperwird in beiden Fällen immer noch zweimal ausgeführt. Dies ist aufgrund der Tatsache , dass für die Funken, rdd1und rdd2sind zwei völlig verschiedene RDDs, und es wird die Transformationsgraphen von unten nach oben , ausgehend von den Blättern bauen; Option 1 bedeutet also:

rdd0 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==0).cache()
rdd1 = rddIn.map(lambda x: customMapper(x)).filter(lambda x: x[0]==1).cache()

Wie Sie sagten, customMapperwird zweimal ausgeführt (außerdem rddInwird es auch zweimal gelesen, was bedeutet, dass es, wenn es aus einer Datenbank stammt, möglicherweise noch langsamer ist).

b.) Es gibt einen Weg, Sie müssen sich nur cache()an der richtigen Stelle bewegen :

mappedRdd = rddIn.map(lambda x: customMapper(x)).cache()
rdd0 = mappedRdd.filter(lambda x: x[0]==0)
rdd1 = mappedRdd.filter(lambda x: x[0]==1)

Auf diese Weise sagen wir Funken, dass es die Teilergebnisse von speichern kann mappedRdd; Diese Teilergebnisse werden dann sowohl für rdd1als auch verwendet rdd2. Aus Sicht des Funkens entspricht dies:

mappedRdd = rddIn.map(lambda x: customMapper(x)).saveAsObjectFile('..')
# forget about everything
rdd0 = sc.objectFile('..').filter(lambda x: x[0]==0)
rdd1 = sc.objectFile('..').filter(lambda x: x[0]==1)
StefanoP
quelle