Wie lese ich mehrere Textdateien in eine einzige RDD?

178

Ich möchte eine Reihe von Textdateien von einem HDFS-Speicherort lesen und eine Zuordnung in einer Iteration mit Spark durchführen.

JavaRDD<String> records = ctx.textFile(args[1], 1); kann jeweils nur eine Datei lesen.

Ich möchte mehr als eine Datei lesen und als einzelne RDD verarbeiten. Wie?

user3705662
quelle

Antworten:

298

Sie können ganze Verzeichnisse angeben, Platzhalter und sogar CSV von Verzeichnissen und Platzhaltern verwenden. Z.B:

sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")

Wie Nick Chammas betont, ist dies eine Belichtung von Hadoop FileInputFormatund daher funktioniert dies auch mit Hadoop (und Scalding).

samthebest
quelle
10
Ja, dies ist der bequemste Weg, um mehrere Dateien als eine einzige RDD zu öffnen. Die API hier ist nur eine Darstellung der FileInputFormat-API von Hadoop , daher Pathgelten dieselben Optionen.
Nick Chammas
7
sc.wholeTextFilesist praktisch für Daten, die nicht durch Zeilen begrenzt sind
Michal Čizmazia
1
Es ist jedoch seltsam, dass, wenn Sie dies tun und Parallelität angeben, sagen sc.textFile(multipleCommaSeparatedDirs,320)Sie , dass dies zu 19430Gesamtaufgaben führt, anstatt 320... es sich so verhält , dass es unionauch zu einer wahnsinnigen Anzahl von Aufgaben aufgrund einer sehr geringen Parallelität führt
lisak
2
Ich habe endlich herausgefunden, wie dieser böse Dateimusterabgleich auf stackoverflow.com/a/33917492/306488 funktioniert, sodass ich keine Komma-
Begrenzung
@femibyte Ich glaube nicht, obwohl ich nicht weiß, warum Sie den Dateinamen in einer anderen Situation als für wissen möchten wholeTextFiles. Was ist Ihr Anwendungsfall? Ich kann mir eine
Problemumgehung vorstellen,
35

Verwenden Sie unionwie folgt:

val sc = new SparkContext(...)
val r1 = sc.textFile("xxx1")
val r2 = sc.textFile("xxx2")
...
val rdds = Seq(r1, r2, ...)
val bigRdd = sc.union(rdds)

Dann bigRddist das die RDD mit allen Dateien.

Wolke
quelle
Danke Cloud, auf diese Weise kann ich alle gewünschten Dateien lesen, bis auf eine! Aber trotzdem muss ich eine Menge Dinge schreiben ...
gsamaras
30

Sie können einen einzelnen textFile-Aufruf verwenden, um mehrere Dateien zu lesen. Scala:

sc.textFile(','.join(files)) 
Joseph
quelle
5
und identische Python-Syntax
Patricksurry
8
Ich denke, das ist nur Python-Syntax. Das Scala-Äquivalent wäresc.textFile(files.mkString(","))
Davos
9

Sie können dies verwenden

Zuerst können Sie einen Puffer / eine Liste von S3-Pfaden erhalten:

import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest

def listFiles(s3_bucket:String, base_prefix : String) = {
    var files = new ArrayList[String]

    //S3 Client and List Object Request
    var s3Client = new AmazonS3Client();
    var objectListing: ObjectListing = null;
    var listObjectsRequest = new ListObjectsRequest();

    //Your S3 Bucket
    listObjectsRequest.setBucketName(s3_bucket)

    //Your Folder path or Prefix
    listObjectsRequest.setPrefix(base_prefix)

    //Adding s3:// to the paths and adding to a list
    do {
      objectListing = s3Client.listObjects(listObjectsRequest);
      for (objectSummary <- objectListing.getObjectSummaries().asScala) {
        files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
      }
      listObjectsRequest.setMarker(objectListing.getNextMarker());
    } while (objectListing.isTruncated());

    //Removing Base Directory Name
    files.remove(0)

    //Creating a Scala List for same
    files.asScala
  }

Übergeben Sie dieses List-Objekt nun an den folgenden Code. Hinweis: sc ist ein Objekt von SQLContext

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf
    }
  }

Jetzt haben Sie eine endgültige Unified RDD, dh df

Optional, und Sie können es auch in einer einzelnen BigRDD neu partitionieren

val files = sc.textFile(filename, 1).repartition(1)

Die Neupartitionierung funktioniert immer: D.

Murtaza Kanchwala
quelle
Bedeutet das nicht, dass die Dateiliste relativ klein sein muss? Nicht Millionen von Dateien.
Mathieu Longtin
2
Können wir den Vorgang des Lesens der aufgelisteten Dateien parallelisieren? so etwas wie sc.parallelize?
Lazywiz
1
@MathieuLongtin: Wenn Sie die Partitionserkennung auf Ihren Spark-Code anwenden können, ist es großartig, wenn Sie das Gleiche tun müssen. Ich habe 10k-Dateien in ca. einer Minute geöffnet.
Murtaza Kanchwala
@lazywiz Wenn Sie keine einzige Festplatte erstellen möchten, entfernen Sie einfach die Neupartitionierungsaktion.
Murtaza Kanchwala
3

In PySpark habe ich einen zusätzlichen nützlichen Weg gefunden, um Dateien zu analysieren. Vielleicht gibt es in Scala ein Äquivalent, aber ich bin nicht zufrieden genug mit einer funktionierenden Übersetzung. Tatsächlich handelt es sich um einen textFile-Aufruf mit zusätzlichen Beschriftungen (im folgenden Beispiel der Schlüssel = Dateiname, Wert = 1 Zeile aus der Datei).

"Beschriftete" Textdatei

Eingang:

import glob
from pyspark import SparkContext
SparkContext.stop(sc)
sc = SparkContext("local","example") # if running locally
sqlContext = SQLContext(sc)

for filename in glob.glob(Data_File + "/*"):
    Spark_Full += sc.textFile(filename).keyBy(lambda x: filename)

Ausgabe: Array mit jedem Eintrag, der ein Tupel enthält, mit Dateiname als Schlüssel und mit Wert = jede Dateizeile. (Technisch gesehen können Sie mit dieser Methode neben dem eigentlichen Dateipfadnamen auch einen anderen Schlüssel verwenden - möglicherweise eine Hashing-Darstellung, um Speicherplatz zu sparen.) dh.

[('/home/folder_with_text_files/file1.txt', 'file1_contents_line1'),
 ('/home/folder_with_text_files/file1.txt', 'file1_contents_line2'),
 ('/home/folder_with_text_files/file1.txt', 'file1_contents_line3'),
 ('/home/folder_with_text_files/file2.txt', 'file2_contents_line1'),
  ...]

Sie können auch eine der beiden Zeilen neu kombinieren:

Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()

[('/home/folder_with_text_files/file1.txt', ['file1_contents_line1', 'file1_contents_line2','file1_contents_line3']),
 ('/home/folder_with_text_files/file2.txt', ['file2_contents_line1'])]

Oder kombinieren Sie ganze Dateien wieder zu einzelnen Zeichenfolgen (in diesem Beispiel ist das Ergebnis das gleiche wie das, das Sie von WholeTextFiles erhalten, jedoch mit der Zeichenfolge "file:", die aus dem Dateipfad entfernt wurde.):

Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()

Abby Sobh
quelle
Als ich diese Codezeile Spark_Full += sc.textFile(filename).keyBy(lambda x: filename) ausführte, bekam ich den Fehler dh TypeError: 'PipelinedRDD' object is not iterable. Ich verstehe, dass diese Zeile eine unveränderliche RDD erstellt. Ich habe mich gefragt, wie Sie sie an eine andere Variable anhängen können.
KartikKannapur
3

Sie können verwenden

JavaRDD<String , String> records = sc.wholeTextFiles("path of your directory")

Hier erhalten Sie den Pfad Ihrer Datei und den Inhalt dieser Datei. So können Sie jede Aktion einer ganzen Datei gleichzeitig ausführen, wodurch der Overhead gespart wird

Shubham Agrawal
quelle
2

Alle Antworten sind richtig mit sc.textFile

Ich habe mich nur gefragt, warum nicht. wholeTextFilesZum Beispiel in diesem Fall ...

val minPartitions = 2
val path = "/pathtohdfs"
    sc.wholeTextFiles(path,minPartitions)
      .flatMap{case (path, text) 
    ...

Eine Einschränkung besteht darin, dass wir kleine Dateien laden müssen, da sonst die Leistung schlecht wird und zu OOM führen kann.

Hinweis :

  • Die gesamte Datei sollte in den Speicher passen
  • Gut für Dateiformate, die NICHT zeilenweise aufteilbar sind ... wie XML-Dateien

Weitere Hinweise zum Besuch

Ram Ghadiyaram
quelle
oder einfachsc.wholeTextFiles(folder).flatMap...
Evhz
sc.wholeTextFiles ("/ path / to / dir")
Ram Ghadiyaram
1

Es gibt eine einfache, saubere Lösung. Verwenden Sie die Methode largeTextFiles (). Dies nimmt ein Verzeichnis und bildet ein Schlüsselwertpaar. Die zurückgegebene RDD ist eine Paar-RDD. Nachfolgend finden Sie die Beschreibung aus den Spark-Dokumenten :

Mit SparkContext.wholeTextFiles können Sie ein Verzeichnis lesen, das mehrere kleine Textdateien enthält, und jedes als Paar (Dateiname, Inhalt) zurückgeben. Dies steht im Gegensatz zu textFile, bei der in jeder Datei ein Datensatz pro Zeile zurückgegeben wird

Harikrishnan Ck
quelle
-1

THIS TRY - Schnittstelle verwendet , um einen Datenrahmen auf externe Speichersysteme (zB Dateisysteme, Schlüssel-Wert - Speicher, etc.) zu schreiben. Verwenden Sie DataFrame.write (), um darauf zuzugreifen.

Neu in Version 1.4.

csv (Pfad, Modus = Keine, Komprimierung = Keine, Sep = Keine, Anführungszeichen = Keine, Escape = Keine, Header = Keine, nullValue = Keine, EscapeQuotes = Keine, QuoteAll = Keine, DateFormat = Keine, TimestampFormat = Keine) Speichert die Inhalt des DataFrame im CSV-Format unter dem angegebenen Pfad.

Parameter: path - Der Pfad in einem von Hadoop unterstützten Dateisystemmodus - gibt das Verhalten des Speichervorgangs an, wenn bereits Daten vorhanden sind.

Anhängen: Hängt den Inhalt dieses DataFrames an vorhandene Daten an. überschreiben: Überschreibt vorhandene Daten. Ignorieren: Ignorieren Sie diesen Vorgang stillschweigend, wenn bereits Daten vorhanden sind. Fehler (Standardfall): Löst eine Ausnahme aus, wenn bereits Daten vorhanden sind. Komprimierung - Komprimierungscodec, der beim Speichern in einer Datei verwendet wird. Dies kann einer der bekannten Kurznamen sein, bei denen die Groß- und Kleinschreibung nicht berücksichtigt wird (keine, bzip2, gzip, lz4, bissig und entleert). sep - Setzt das einzelne Zeichen als Trennzeichen für jedes Feld und jeden Wert. Wenn Keine festgelegt ist, wird der Standardwert ,, verwendet. quote - Legt das einzelne Zeichen fest, das zum Escapezeichen von Anführungszeichen verwendet wird, wobei das Trennzeichen Teil des Werts sein kann. Wenn Keine festgelegt ist, wird der Standardwert "" verwendet. Wenn Sie Anführungszeichen deaktivieren möchten, müssen Sie eine leere Zeichenfolge festlegen. Escape - Legt das einzelne Zeichen fest, das zum Escapezeichen von Anführungszeichen innerhalb eines bereits in Anführungszeichen gesetzten Werts verwendet wird. Wenn Keine festgelegt ist , Es wird der Standardwert \ EscapeQuotes verwendet. Ein Flag, das angibt, ob Werte, die Anführungszeichen enthalten, immer in Anführungszeichen eingeschlossen werden sollen. Wenn Keine festgelegt ist, wird der Standardwert true verwendet, wobei alle Werte, die ein Anführungszeichen enthalten, maskiert werden. quoteAll - Ein Flag, das angibt, ob alle Werte immer in Anführungszeichen gesetzt werden sollen. Wenn Keine festgelegt ist, wird der Standardwert false verwendet, wobei nur Werte maskiert werden, die ein Anführungszeichen enthalten. Header - Schreibt die Namen der Spalten als erste Zeile. Wenn Keine festgelegt ist, wird der Standardwert false verwendet. nullValue - Legt die Zeichenfolgendarstellung eines Nullwerts fest. Wenn Keine festgelegt ist, wird der Standardwert leere Zeichenfolge verwendet. dateFormat - Legt die Zeichenfolge fest, die ein Datumsformat angibt. Benutzerdefinierte Datumsformate folgen den Formaten unter java.text.SimpleDateFormat. Dies gilt für den Datumstyp. Wenn Keine festgelegt ist, wird der Standardwert JJJJ-MM-TT verwendet. timestampFormat - Legt die Zeichenfolge fest, die ein Zeitstempelformat angibt. Benutzerdefinierte Datumsformate folgen den Formaten unter java.text.SimpleDateFormat. Dies gilt für den Zeitstempeltyp. Wenn Keine festgelegt ist, wird der Standardwert yyyy-MM-dd'T'HH: mm: ss.SSSZZ verwendet.

K Rakesh Patra
quelle
-4
rdd = textFile('/data/{1.txt,2.txt}')
Alya
quelle