Verwenden von Apache Spark für ML. Immer wieder Serialisierungsfehler

7

Daher verwende ich Spark für die Stimmungsanalyse und erhalte immer wieder Fehler mit den Serialisierern, die (glaube ich) zum Weitergeben von Python-Objekten verwendet werden.

PySpark worker failed with exception:
Traceback (most recent call last):
  File "/Users/abdul/Desktop/RSI/spark-1.0.1-bin-    hadoop1/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/abdul/Desktop/RSI/spark-1.0.1-bin-    hadoop1/python/pyspark/serializers.py", line 191, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/Users/abdul/Desktop/RSI/spark-1.0.1-bin-    hadoop1/python/pyspark/serializers.py", line 123, in dump_stream
    for obj in iterator:
  File "/Users/abdul/Desktop/RSI/spark-1.0.1-bin-    hadoop1/python/pyspark/serializers.py", line 180, in _batched
    for item in iterator:
TypeError: __init__() takes exactly 3 arguments (2 given)

Den Code für Serializer finden Sie hier

und mein Code ist hier

Seashark97
quelle

Antworten:

10

Am häufigsten Fehler Serialisierung in (Py) Zündmittels , dass ein Teil Ihrer verteilten Code (zB Funktionen übergeben map) hat Abhängigkeiten auf nicht-serializable Daten . Betrachten Sie folgendes Beispiel:

rdd = sc.parallelize(range(5))
rdd = rdd.map(lambda x: x + 1)
rdd.collect()

Hier haben Sie die Sammel- und Lambda-Funktion verteilt, um sie an alle Mitarbeiter zu senden. Lambda ist vollständig in sich geschlossen, sodass es einfach ist, seine Binärdarstellung ohne Bedenken auf andere Knoten zu kopieren.

Lassen Sie uns die Dinge jetzt etwas interessanter machen:

f = open("/etc/hosts")
rdd = sc.parallelize(range(100))
rdd = rdd.map(lambda x: f.read())
rdd.collect()
f.close()

Boom! Seltsamer Fehler im Serialisierungsmodul! Was gerade passiert ist, ist, dass wir versucht hatten f, ein Dateiobjekt an Arbeiter weiterzugeben . Offensichtlich ist das Dateiobjekt ein Handle für lokale Daten und kann daher nicht an andere Computer gesendet werden.


Was passiert also in Ihrem spezifischen Code? Ohne tatsächliche Daten und Kenntnis des Datensatzformats kann ich es nicht vollständig debuggen, aber ich denke, dass das Problem von dieser Zeile ausgeht:

def vectorizer(text, vocab=vocab_dict):

In Python werden Schlüsselwortargumente initialisiert, wenn die Funktion zum ersten Mal aufgerufen wird. Wenn Sie sc.parallelize(...).map(vectorizer)direkt nach der Definition aufrufen , vocab_dictist dies lokal verfügbar , aber Remote- Mitarbeiter wissen absolut nichts darüber. Daher wird die Funktion mit weniger Parametern aufgerufen, als erwartet wird, was zu einem __init__() takes exactly 3 arguments (2 given)Fehler führt.

Beachten Sie auch, dass Sie einem sehr schlechten Anrufmuster folgen sc.parallelize(...)...collect(). Zuerst verteilen Sie Ihre Sammlung auf den gesamten Cluster, führen einige Berechnungen durch und ziehen dann das Ergebnis ab. Aber das Hin- und Herschicken von Daten ist hier ziemlich sinnlos. Stattdessen können Sie diese Berechnungen nur lokal durchführen und die parallelen Prozesse von Spark nur ausführen, wenn Sie mit wirklich großen Datenmengen arbeiten (wie Sie amazon_dataset, glaube ich).

Freund
quelle