Ich möchte eine Reihe von Textdateien von einem HDFS-Speicherort lesen und eine Zuordnung in einer Iteration mit Spark durchführen.
JavaRDD<String> records = ctx.textFile(args[1], 1);
kann jeweils nur eine Datei lesen.
Ich möchte mehr als eine Datei lesen und als einzelne RDD verarbeiten. Wie?
apache-spark
user3705662
quelle
quelle
Path
gelten dieselben Optionen.sc.wholeTextFiles
ist praktisch für Daten, die nicht durch Zeilen begrenzt sindsc.textFile(multipleCommaSeparatedDirs,320)
Sie , dass dies zu19430
Gesamtaufgaben führt, anstatt320
... es sich so verhält , dass esunion
auch zu einer wahnsinnigen Anzahl von Aufgaben aufgrund einer sehr geringen Parallelität führtwholeTextFiles
. Was ist Ihr Anwendungsfall? Ich kann mir eineVerwenden Sie
union
wie folgt:Dann
bigRdd
ist das die RDD mit allen Dateien.quelle
Sie können einen einzelnen textFile-Aufruf verwenden, um mehrere Dateien zu lesen. Scala:
quelle
sc.textFile(files.mkString(","))
Sie können dies verwenden
Zuerst können Sie einen Puffer / eine Liste von S3-Pfaden erhalten:
Übergeben Sie dieses List-Objekt nun an den folgenden Code. Hinweis: sc ist ein Objekt von SQLContext
Jetzt haben Sie eine endgültige Unified RDD, dh df
Optional, und Sie können es auch in einer einzelnen BigRDD neu partitionieren
Die Neupartitionierung funktioniert immer: D.
quelle
In PySpark habe ich einen zusätzlichen nützlichen Weg gefunden, um Dateien zu analysieren. Vielleicht gibt es in Scala ein Äquivalent, aber ich bin nicht zufrieden genug mit einer funktionierenden Übersetzung. Tatsächlich handelt es sich um einen textFile-Aufruf mit zusätzlichen Beschriftungen (im folgenden Beispiel der Schlüssel = Dateiname, Wert = 1 Zeile aus der Datei).
"Beschriftete" Textdatei
Eingang:
Ausgabe: Array mit jedem Eintrag, der ein Tupel enthält, mit Dateiname als Schlüssel und mit Wert = jede Dateizeile. (Technisch gesehen können Sie mit dieser Methode neben dem eigentlichen Dateipfadnamen auch einen anderen Schlüssel verwenden - möglicherweise eine Hashing-Darstellung, um Speicherplatz zu sparen.) dh.
Sie können auch eine der beiden Zeilen neu kombinieren:
Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()
Oder kombinieren Sie ganze Dateien wieder zu einzelnen Zeichenfolgen (in diesem Beispiel ist das Ergebnis das gleiche wie das, das Sie von WholeTextFiles erhalten, jedoch mit der Zeichenfolge "file:", die aus dem Dateipfad entfernt wurde.):
Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()
quelle
Spark_Full += sc.textFile(filename).keyBy(lambda x: filename)
ausführte, bekam ich den Fehler dhTypeError: 'PipelinedRDD' object is not iterable
. Ich verstehe, dass diese Zeile eine unveränderliche RDD erstellt. Ich habe mich gefragt, wie Sie sie an eine andere Variable anhängen können.Sie können verwenden
Hier erhalten Sie den Pfad Ihrer Datei und den Inhalt dieser Datei. So können Sie jede Aktion einer ganzen Datei gleichzeitig ausführen, wodurch der Overhead gespart wird
quelle
Alle Antworten sind richtig mit
sc.textFile
Ich habe mich nur gefragt, warum nicht.
wholeTextFiles
Zum Beispiel in diesem Fall ...Eine Einschränkung besteht darin, dass wir kleine Dateien laden müssen, da sonst die Leistung schlecht wird und zu OOM führen kann.
Hinweis :
Weitere Hinweise zum Besuch
quelle
sc.wholeTextFiles(folder).flatMap...
Es gibt eine einfache, saubere Lösung. Verwenden Sie die Methode largeTextFiles (). Dies nimmt ein Verzeichnis und bildet ein Schlüsselwertpaar. Die zurückgegebene RDD ist eine Paar-RDD. Nachfolgend finden Sie die Beschreibung aus den Spark-Dokumenten :
quelle
THIS TRY - Schnittstelle verwendet , um einen Datenrahmen auf externe Speichersysteme (zB Dateisysteme, Schlüssel-Wert - Speicher, etc.) zu schreiben. Verwenden Sie DataFrame.write (), um darauf zuzugreifen.
Neu in Version 1.4.
csv (Pfad, Modus = Keine, Komprimierung = Keine, Sep = Keine, Anführungszeichen = Keine, Escape = Keine, Header = Keine, nullValue = Keine, EscapeQuotes = Keine, QuoteAll = Keine, DateFormat = Keine, TimestampFormat = Keine) Speichert die Inhalt des DataFrame im CSV-Format unter dem angegebenen Pfad.
Parameter: path - Der Pfad in einem von Hadoop unterstützten Dateisystemmodus - gibt das Verhalten des Speichervorgangs an, wenn bereits Daten vorhanden sind.
Anhängen: Hängt den Inhalt dieses DataFrames an vorhandene Daten an. überschreiben: Überschreibt vorhandene Daten. Ignorieren: Ignorieren Sie diesen Vorgang stillschweigend, wenn bereits Daten vorhanden sind. Fehler (Standardfall): Löst eine Ausnahme aus, wenn bereits Daten vorhanden sind. Komprimierung - Komprimierungscodec, der beim Speichern in einer Datei verwendet wird. Dies kann einer der bekannten Kurznamen sein, bei denen die Groß- und Kleinschreibung nicht berücksichtigt wird (keine, bzip2, gzip, lz4, bissig und entleert). sep - Setzt das einzelne Zeichen als Trennzeichen für jedes Feld und jeden Wert. Wenn Keine festgelegt ist, wird der Standardwert ,, verwendet. quote - Legt das einzelne Zeichen fest, das zum Escapezeichen von Anführungszeichen verwendet wird, wobei das Trennzeichen Teil des Werts sein kann. Wenn Keine festgelegt ist, wird der Standardwert "" verwendet. Wenn Sie Anführungszeichen deaktivieren möchten, müssen Sie eine leere Zeichenfolge festlegen. Escape - Legt das einzelne Zeichen fest, das zum Escapezeichen von Anführungszeichen innerhalb eines bereits in Anführungszeichen gesetzten Werts verwendet wird. Wenn Keine festgelegt ist , Es wird der Standardwert \ EscapeQuotes verwendet. Ein Flag, das angibt, ob Werte, die Anführungszeichen enthalten, immer in Anführungszeichen eingeschlossen werden sollen. Wenn Keine festgelegt ist, wird der Standardwert true verwendet, wobei alle Werte, die ein Anführungszeichen enthalten, maskiert werden. quoteAll - Ein Flag, das angibt, ob alle Werte immer in Anführungszeichen gesetzt werden sollen. Wenn Keine festgelegt ist, wird der Standardwert false verwendet, wobei nur Werte maskiert werden, die ein Anführungszeichen enthalten. Header - Schreibt die Namen der Spalten als erste Zeile. Wenn Keine festgelegt ist, wird der Standardwert false verwendet. nullValue - Legt die Zeichenfolgendarstellung eines Nullwerts fest. Wenn Keine festgelegt ist, wird der Standardwert leere Zeichenfolge verwendet. dateFormat - Legt die Zeichenfolge fest, die ein Datumsformat angibt. Benutzerdefinierte Datumsformate folgen den Formaten unter java.text.SimpleDateFormat. Dies gilt für den Datumstyp. Wenn Keine festgelegt ist, wird der Standardwert JJJJ-MM-TT verwendet. timestampFormat - Legt die Zeichenfolge fest, die ein Zeitstempelformat angibt. Benutzerdefinierte Datumsformate folgen den Formaten unter java.text.SimpleDateFormat. Dies gilt für den Zeitstempeltyp. Wenn Keine festgelegt ist, wird der Standardwert yyyy-MM-dd'T'HH: mm: ss.SSSZZ verwendet.
quelle
quelle