So laden Sie eine lokale Datei in sc.textFile anstelle von HDFS

100

Ich folge dem großartigen Funken-Tutorial

Also versuche ich um 46:00 Uhr, das zu laden, README.mdaber ich scheitere an dem, was ich tue, folgendes:

$ sudo docker run -i -t -h sandbox sequenceiq/spark:1.1.0 /etc/bootstrap.sh -bash
bash-4.1# cd /usr/local/spark-1.1.0-bin-hadoop2.4
bash-4.1# ls README.md
README.md
bash-4.1# ./bin/spark-shell
scala> val f = sc.textFile("README.md")
14/12/04 12:11:14 INFO storage.MemoryStore: ensureFreeSpace(164073) called with curMem=0, maxMem=278302556
14/12/04 12:11:14 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 160.2 KB, free 265.3 MB)
f: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at <console>:12
scala> val wc = f.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://sandbox:9000/user/root/README.md
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)

Wie kann ich das laden README.md?

Jas
quelle

Antworten:

177

Versuchen Sie es explizit anzugeben sc.textFile("file:///path to the file/"). Der Fehler tritt auf, wenn die Hadoop-Umgebung festgelegt ist.

SparkContext.textFile ruft intern auf org.apache.hadoop.mapred.FileInputFormat.getSplits, was wiederum verwendet wird, org.apache.hadoop.fs.getDefaultUriwenn kein Schema vorhanden ist. Diese Methode liest den Parameter "fs.defaultFS" von Hadoop conf. Wenn Sie die Umgebungsvariable HADOOP_CONF_DIR festlegen, wird der Parameter normalerweise als "hdfs: // ..." festgelegt. sonst "file: //".

suztomo
quelle
Wissen Sie zufällig, wie man das mit Java macht? Ich sehe keine Methode. Es ist sehr frustrierend, dass es keine einfache Möglichkeit gibt, einen Pfad zum Laden einer Datei aus einem einfachen Dateisystem anzugeben.
Brad Ellis
Ich antworte mir. Es gibt einen --file-Schalter, den Sie mit dem Spark-Submit übergeben. Der Dateipfad kann also fest codiert sein oder Ihre Konfiguration ist für die App eingerichtet, aber Sie signalisieren auch diesen Pfad. wenn Sie einreichen, damit die Ausführenden den Pfad sehen können.
Brad Ellis
24

Gonbes Antwort ist ausgezeichnet. Trotzdem möchte ich das erwähnen file:///= ~/../../, nicht $SPARK_HOME. Hoffe das könnte etwas Zeit für Neulinge wie mich sparen.

zaxliu
quelle
4
file:///ist der Stammordner des Dateisystems, wie er von der ausführenden JVM gesehen wird, nicht zwei Ebenen über dem Basisordner. Das in RFC 8089 angegebene URI-Format lautet file://hostname/absolute/path. Im lokalen Fall ist die hostname(Berechtigungs-) Komponente leer.
Hristo Iliev
17

Während Spark das Laden von Dateien aus dem lokalen Dateisystem unterstützt, müssen die Dateien auf allen Knoten in Ihrem Cluster unter demselben Pfad verfügbar sein.

Einige Netzwerkdateisysteme, wie NFS, AFS und die NFS-Schicht von MapR, werden dem Benutzer als reguläres Dateisystem zur Verfügung gestellt.

Wenn sich Ihre Daten bereits in einem dieser Systeme befinden, können Sie sie als Eingabe verwenden, indem Sie einfach eine Datei angeben: // path; Spark wird damit umgehen, solange das Dateisystem auf jedem Knoten unter demselben Pfad bereitgestellt wird. Jeder Knoten muss denselben Pfad haben

 rdd = sc.textFile("file:///path/to/file")

Wenn Ihre Datei nicht bereits auf allen Knoten im Cluster vorhanden ist, können Sie sie lokal auf den Treiber laden, ohne Spark zu durchlaufen, und dann parallelize aufrufen, um den Inhalt an die Worker zu verteilen

Achten Sie darauf, dass Sie file: // voranstellen und je nach Betriebssystem "/" oder "\" verwenden.

Aklank Jain
quelle
1
Gibt es eine Möglichkeit, dass Spark Daten automatisch aus seinem Verzeichnis $ SPARK_HOME auf alle Rechenknoten kopiert? Oder müssen Sie das manuell machen?
Matthias
Wo verarbeitet der Spark-Quellcode verschiedene Dateisystemformate?
Saher Ahwal
12

Sie müssen nur den Pfad der Datei als "Datei: /// Verzeichnis / Datei" angeben.

Beispiel:

val textFile = sc.textFile("file:///usr/local/spark/README.md")
Hamdi Charef
quelle
12

Beachtung:

Stellen Sie sicher, dass Sie spark im lokalen Modus ausführen, wenn Sie Daten von local ( sc.textFile("file:///path to the file/")) laden. Andernfalls wird eine solche Fehlermeldung angezeigt Caused by: java.io.FileNotFoundException: File file:/data/sparkjob/config2.properties does not exist. Da Executoren, die auf verschiedenen Workern ausgeführt werden, diese Datei nicht im lokalen Pfad finden.

Matiji66
quelle
10

Befindet sich die Datei in Ihrem Spark-Masterknoten (z. B. bei Verwendung von AWS EMR), starten Sie die Spark-Shell zuerst im lokalen Modus.

$ spark-shell --master=local
scala> val df = spark.read.json("file:///usr/lib/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

Alternativ können Sie die Datei zuerst aus dem lokalen Dateisystem in HDFS kopieren und dann Spark im Standardmodus (z. B. YARN bei Verwendung von AWS EMR) starten, um die Datei direkt zu lesen.

$ hdfs dfs -mkdir -p /hdfs/spark/examples
$ hadoop fs -put /usr/lib/spark/examples/src/main/resources/people.json /hdfs/spark/examples
$ hadoop fs -ls /hdfs/spark/examples
Found 1 items
-rw-r--r--   1 hadoop hadoop         73 2017-05-01 00:49 /hdfs/spark/examples/people.json

$ spark-shell
scala> val df = spark.read.json("/hdfs/spark/examples/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
Joarder Kamal
quelle
9

Ich habe eine Datei namens NewsArticle.txt auf meinem Desktop.

In Spark habe ich Folgendes eingegeben:

val textFile= sc.textFile(“file:///C:/Users/582767/Desktop/NewsArticle.txt”)

Ich musste alle \ to / Zeichen für den Dateipfad ändern.

Um zu testen, ob es funktioniert hat, habe ich Folgendes eingegeben:

textFile.foreach(println)

Ich verwende Windows 7 und habe Hadoop nicht installiert.

Gen
quelle
5

Dies wurde in der Spark-Mailingliste besprochen. Bitte beziehen Sie sich auf diese Mail .

Sie sollten hadoop fs -put <localsrc> ... <dst>die Datei kopieren in hdfs:

${HADOOP_COMMON_HOME}/bin/hadoop fs -put /path/to/README.md README.md
Nan Xiao
quelle
5

Dies ist mir mit Spark 2.3 passiert, wobei Hadoop auch im gemeinsamen Benutzerverzeichnis "hadoop" installiert ist. Da sowohl Spark als auch Hadoop im selben gemeinsamen Verzeichnis installiert wurden, betrachtet Spark das Schema standardmäßig als hdfsund beginnt mit der Suche nach den Eingabedateien unter hdfs wie von fs.defaultFSin Hadoop's angegeben core-site.xml. In solchen Fällen müssen wir das Schema explizit als angeben file:///<absoloute path to file>.

Binita Bharati
quelle
0

Dies ist die Lösung für diesen Fehler, den ich auf einem Spark-Cluster erhalten habe, der in Azure auf einem Windows-Cluster gehostet wird:

Laden Sie die unformatierte Datei HVAC.csv und analysieren Sie sie mit der Funktion

data = sc.textFile("wasb:///HdiSamples/SensorSampleData/hvac/HVAC.csv")

Wir verwenden (wasb: ///), um Hadoop den Zugriff auf die Azure-Blog-Speicherdatei zu ermöglichen, und die drei Schrägstriche sind eine relative Referenz zum laufenden Knotencontainerordner.

Beispiel: Wenn der Pfad für Ihre Datei im Datei-Explorer im Spark-Cluster-Dashboard wie folgt lautet:

sflcc1 \ sflccspark1 \ HdiSamples \ SensorSampleData \ hvac

Der Pfad wird also wie folgt beschrieben: sflcc1: ist der Name des Speicherkontos. sflccspark: ist der Name des Clusterknotens.

Wir beziehen uns also auf den aktuellen Namen des Clusterknotens mit den relativen drei Schrägstrichen.

Hoffe das hilft.

Mostafa
quelle
0

Wenn Sie versuchen, die Datei aus HDFS zu lesen. Versuchen Sie, den Pfad in SparkConf festzulegen

 val conf = new SparkConf().setMaster("local[*]").setAppName("HDFSFileReader")
 conf.set("fs.defaultFS", "hdfs://hostname:9000")
Viyaan Jhiingade
quelle
Fügen Sie Ihrem Code einen Einzug mit 4 Leerzeichen / Tabulatoren hinzu, damit er als Code formatiert wird.
Viele
0

Sie müssen sc.textFile (...) nicht verwenden, um lokale Dateien in Datenrahmen zu konvertieren. Eine der Möglichkeiten besteht darin, eine lokale Datei Zeile für Zeile zu lesen und sie dann in einen Spark-Datensatz umzuwandeln. Hier ist ein Beispiel für einen Windows-Computer in Java:

StructType schemata = DataTypes.createStructType(
            new StructField[]{
                    createStructField("COL1", StringType, false),
                    createStructField("COL2", StringType, false),
                    ...
            }
    );

String separator = ";";
String filePath = "C:\\work\\myProj\\myFile.csv";
SparkContext sparkContext = new SparkContext(new SparkConf().setAppName("MyApp").setMaster("local"));
JavaSparkContext jsc = new JavaSparkContext (sparkContext );
SQLContext sqlContext = SQLContext.getOrCreate(sparkContext );

List<String[]> result = new ArrayList<>();
try (BufferedReader br = new BufferedReader(new FileReader(filePath))) {
    String line;
    while ((line = br.readLine()) != null) {
      String[] vals = line.split(separator);
      result.add(vals);
    }
 } catch (Exception ex) {
       System.out.println(ex.getMessage());
       throw new RuntimeException(ex);
  }
  JavaRDD<String[]> jRdd = jsc.parallelize(result);
  JavaRDD<Row> jRowRdd = jRdd .map(RowFactory::create);
  Dataset<Row> data = sqlContext.createDataFrame(jRowRdd, schemata);

Jetzt können Sie den Datenrahmen datain Ihrem Code verwenden.

Andrushenko Alexander
quelle
0

Ich habe Folgendes versucht und es hat von meinem lokalen Dateisystem aus funktioniert. Grundsätzlich kann Spark vom lokalen, HDFS- und AWS S3-Pfad lesen

listrdd=sc.textFile("file:////home/cloudera/Downloads/master-data/retail_db/products")
BigData-Guru
quelle
-6

Versuchen

val f = sc.textFile("./README.md")
Soumya Simanta
quelle
scala> val f = sc.textFile("./README.md") 14/12/04 12:54:33 INFO storage.MemoryStore: ensureFreeSpace(81443) called with curMem=164073, maxMem=278302556 14/12/04 12:54:33 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 79.5 KB, free 265.2 MB) f: org.apache.spark.rdd.RDD[String] = ./README.md MappedRDD[5] at textFile at <console>:12 scala> val wc = f.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://sandbox:9000/user/root/README.md at
Jas
Können Sie eine pwdauf der Bash Shell machenbash-4.1#
Soumya Simanta
bash-4.1 # pwd /usr/local/spark-1.1.0-bin-hadoop2.4
Jas
Dies funktioniert bei mir auf Funken ohne Hadoop / HDFS. Es scheint jedoch nicht für das OP zu funktionieren, da es ihnen einen Fehlerdump gab.
Paul