Ich möchte eine CSV in Spark lesen und als DataFrame konvertieren und in HDFS mit speichern df.registerTempTable("table_name")
Ich habe versucht:
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
Fehler, den ich bekommen habe:
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Was ist der richtige Befehl, um eine CSV-Datei als DataFrame in Apache Spark zu laden?
Antworten:
spark-csv ist Teil der Spark-Kernfunktionalität und erfordert keine separate Bibliothek. Sie könnten es also zum Beispiel tun
In scala (dies funktioniert für alle Format-in-Trennzeichen, die "," für csv, "\ t" für tsv usw. erwähnen).
val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")
quelle
Analysieren Sie CSV und laden Sie es mit Spark 2.x als DataFrame / DataSet
Initialisieren Sie zunächst das
SparkSession
Objekt standardmäßig, das in Shells als verfügbar istspark
1. Machen Sie es programmatisch
Update: Hinzufügen aller Optionen von hier aus, falls der Link in Zukunft unterbrochen wird
2. Sie können diese SQL-Methode auch ausführen
Abhängigkeiten :
Spark-Version <2.0
Abhängigkeiten:
quelle
spark-core_2.11
undspark-sql_2.11
der2.0.1
Version ist in Ordnung. Wenn möglich, fügen Sie die Fehlermeldung hinzu.spark.read.format("csv").option("delimiter ", "|") ...
programmatic way
ausgeschaltet ist, zu gehen.format("csv")
und ersetzen.load(...
mit.csv(...
. Dieoption
Methode gehört zur DataFrameReader-Klasse, wie sie von derread
Methode zurückgegeben wird, wobei die Methodenload
undcsv
einen Datenrahmen zurückgeben, sodass nach dem Aufruf keine Optionen markiert werden können. Diese Antwort ist ziemlich gründlich, aber Sie sollten einen Link zur Dokumentation erstellen, damit die Benutzer alle anderen verfügbaren CSV-Optionen sehen können. Spark.apache.org/docs/latest/api/scala/… *): org.apache.spark.sql.DataFrameEs ist für dessen Hadoop 2.6 und Spark 1.6 ist und ohne "Databricks" -Paket.
quelle
Mit Spark 2.0 können Sie CSV wie folgt lesen
quelle
spark.read.csv(path)
undspark.read.format("csv").load(path)
?In Java 1.8 Dieses Code-Snippet funktioniert perfekt zum Lesen von CSV-Dateien
POM.xml
Java
quelle
Das Parsen einer CSV-Datei ist mit vielen Herausforderungen verbunden. Wenn die Dateigröße größer ist und die Spaltenwerte nicht Englisch / Escape / Separator / andere Zeichen enthalten, kann dies zu Analysefehlern führen.
Die Magie liegt dann in den Optionen, die verwendet werden. Diejenigen, die für mich gearbeitet haben und hoffen, dass sie die meisten Randfälle abdecken, sind im folgenden Code aufgeführt:
Hoffentlich hilft das. Weitere Informationen finden Sie unter : Verwenden von PySpark 2 zum Lesen von CSV mit HTML-Quellcode
Hinweis: Der obige Code stammt aus der Spark 2-API, in der die CSV-API zum Lesen von Dateien mit integrierten Paketen von Spark geliefert wird, die installiert werden können.
Hinweis: PySpark ist ein Python-Wrapper für Spark und verwendet dieselbe API wie Scala / Java.
quelle
Pennys Spark 2-Beispiel ist der Weg, dies in spark2 zu tun. Es gibt noch einen weiteren Trick: Lassen Sie diesen Header für Sie generieren, indem Sie einen ersten Scan der Daten durchführen und die Option
inferSchema
auf setzentrue
Angenommen, es
spark
handelt sich um eine Spark-Sitzung, die Sie eingerichtet haben, ist die Operation, die in die CSV-Indexdatei aller Landsat-Bilder geladen wird, die Amazon auf S3 hostet.Die schlechte Nachricht ist: Dies löst einen Scan durch die Datei aus; Für etwas Großes wie diese 20 + MB komprimierte CSV-Datei kann dies über eine Langstreckenverbindung 30 Sekunden dauern. Denken Sie daran: Sie sollten das Schema besser manuell codieren, sobald Sie es erhalten haben.
(Code-Snippet Apache Software License 2.0, lizenziert, um alle Unklarheiten zu vermeiden; etwas, das ich als Demo / Integrationstest der S3-Integration durchgeführt habe)
quelle
Falls Sie ein Glas mit Scala 2.11 und Apache 2.0 oder höher erstellen.
Es ist nicht erforderlich, ein
sqlContext
oder einsparkContext
Objekt zu erstellen . Nur einSparkSession
Objekt genügt für alle Anforderungen.Folgendes ist Mycode, der gut funktioniert:
Wenn Sie im Cluster ausgeführt werden, wechseln Sie einfach
.master("local")
zu,.master("yarn")
während Sie dassparkBuilder
Objekt definierenDas Spark-Dokument behandelt dies: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html
quelle
Fügen Sie der POM-Datei folgende Spark-Abhängigkeiten hinzu:
// Spark-Konfiguration:
val spark = SparkSession.builder (). master ("local"). appName ("Beispiel-App"). getOrCreate ()
// CSV-Datei lesen:
val df = spark.read.option ("header", "true"). csv ("FILE_PATH")
// Ausgabe anzeigen
df.show ()
quelle
Um aus dem relativen Pfad auf dem System zu lesen, verwenden Sie die System.getProperty-Methode, um das aktuelle Verzeichnis abzurufen, und verwenden Sie außerdem das Laden der Datei unter Verwendung des relativen Pfads.
Funke: 2.4.4 Scala: 2.11.12
quelle
Wenn Sie mit Spark 2.4+ eine CSV aus einem lokalen Verzeichnis laden möchten, können Sie zwei Sitzungen verwenden und diese in den Hive laden. Die erste Sitzung sollte mit master () config als "local [*]" und die zweite Sitzung mit "yarn" und Hive aktiviert sein.
Das folgende hat für mich funktioniert.
Als es lief
spark2-submit --master "yarn" --conf spark.ui.enabled=false testCSV.jar
, ging es gut und erstellte den Tisch im Bienenstock.quelle
Das Standarddateiformat ist Parkett mit spark.read .. und CSV zum Lesen von Dateien, weshalb Sie die Ausnahme erhalten. Geben Sie das CSV-Format mit der API an, die Sie verwenden möchten
quelle
Versuchen Sie dies, wenn Sie Spark 2.0+ verwenden
Hinweis: - Dies funktioniert für alle durch Trennzeichen getrennten Dateien. Verwenden Sie einfach die Option ("Trennzeichen"), um den Wert zu ändern.
Hoffe das ist hilfreich.
quelle
Mit der integrierten Spark-CSV können Sie dies problemlos mit dem neuen SparkSession-Objekt für Spark> 2.0 erledigen.
Es gibt verschiedene Optionen, die Sie einstellen können.
header
: ob Ihre Datei oben eine Kopfzeile enthältinferSchema
: ob Sie das Schema automatisch ableiten möchten oder nicht. Standard isttrue
. Ich bevorzuge es immer, ein Schema bereitzustellen, um die richtigen Datentypen sicherzustellen.mode
: Parsing-Modus, PERMISSIVE, DROPMALFORMED oder FAILFASTdelimiter
: Um ein Trennzeichen anzugeben, ist der Standardwert Komma (',').quelle