Ich bin neu in Spark und versuche, CSV-Daten aus einer Datei mit Spark zu lesen. Folgendes mache ich:
sc.textFile('file.csv')
.map(lambda line: (line.split(',')[0], line.split(',')[1]))
.collect()
Ich würde erwarten, dass dieser Aufruf mir eine Liste der beiden ersten Spalten meiner Datei gibt, aber ich erhalte folgende Fehlermeldung:
File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range
obwohl meine CSV-Datei als mehr als eine Spalte.
python
csv
apache-spark
pyspark
Kernael
quelle
quelle
csv
Bibliothek zu analysieren , um alle Escapezeichen zu verarbeiten, da das einfache Aufteilen durch Komma nicht funktioniert, wenn die Werte beispielsweise Kommas enthalten.","
.Spark 2.0.0+
Sie können die integrierte CSV-Datenquelle direkt verwenden:
oder
ohne Berücksichtigung externer Abhängigkeiten.
Funke <2.0.0 :
Anstelle einer manuellen Analyse, die im Allgemeinen alles andere als trivial ist, würde ich empfehlen
spark-csv
:Stellen Sie sicher , dass Funken CSV in dem Pfad enthalten (
--packages
,--jars
,--driver-class-path
)Und laden Sie Ihre Daten wie folgt:
Es kann das Laden, das Ableiten von Schemas und das Löschen fehlerhafter Zeilen verarbeiten und erfordert keine Übergabe von Daten von Python an die JVM.
Hinweis :
Wenn Sie das Schema kennen, ist es besser, Schema-Inferenzen zu vermeiden und an zu übergeben
DataFrameReader
. Angenommen, Sie haben drei Spalten - Integer, Double und String:quelle
pyspark --packages com.databricks:spark-csv_2.11:1.4.0
(stellen Sie sicher, dass Sie die Databricks / Spark-Versionen auf die von Ihnen installierten ändern).quelle
Und noch eine Option, die darin besteht, die CSV-Datei mit Pandas zu lesen und dann den Pandas DataFrame in Spark zu importieren.
Beispielsweise:
quelle
Durch einfaches Teilen durch Komma werden auch Kommas innerhalb von Feldern (z. B.
a,b,"1,2,3",c
) geteilt, daher wird dies nicht empfohlen. Die Antwort von zero323 ist gut, wenn Sie die DataFrames-API verwenden möchten, aber wenn Sie sich an Base Spark halten möchten, können Sie CSVs in Basis-Python mit dem CSV- Modul analysieren :BEARBEITEN: Wie in den Kommentaren bei @muon erwähnt, wird der Header wie jede andere Zeile behandelt, sodass Sie ihn manuell extrahieren müssen. Beispiel:
header = rdd.first(); rdd = rdd.filter(lambda x: x != header)
(Stellen Sie sicher, dass Sie keine Änderungen vornehmen,header
bevor der Filter ausgewertet wird.) Aber an diesem Punkt ist es wahrscheinlich besser, einen integrierten CSV-Parser zu verwenden.quelle
StringIO
.csv
kann jedes iterable verwenden b)__next__
sollte nicht direkt verwendet werden und schlägt in einer leeren Zeile fehl. Werfen Sie einen Blick auf flatMap c) Es wäre viel effizienter zu verwenden,mapPartitions
anstatt den Leser in jeder Zeile zu initialisieren :)rdd.mapPartitions(lambda x: csv.reader(x))
Arbeit, währendrdd.map(lambda x: csv.reader(x))
ein Fehler ausgelöst wird? Ich hatte erwartet, dass beide gleich werfen würdenTypeError: can't pickle _csv.reader objects
. Es scheint auch so, als würdemapPartitions
automatisch ein Äquivalent zu "Readlines" für dascsv.reader
Objektmap
aufgerufen , wobei ich mit__next__
explizit aufrufen musste , um die Listen aus dem zu entfernencsv.reader
. 2) Wo kommt esflatMap
rein? NurmapPartitions
alleine anzurufen hat für mich funktioniert.rdd.mapPartitions(lambda x: csv.reader(x))
funktioniert, weilmapPartitions
einIterable
Objekt erwartet . Wenn Sie explizit sein möchten, können Sie Verständnis oder Generatorausdruck.map
allein funktioniert nicht, weil es nicht über Objekt iteriert. Daher mein Verwendungsvorschlag,flatMap(lambda x: csv.reader([x]))
der über den Leser iteriert.mapPartitions
Ist hier aber viel besser.Dies ist in PYSPARK
Dann können Sie überprüfen
quelle
Wenn Sie csv als Datenrahmen laden möchten, können Sie Folgendes tun:
Es hat gut für mich funktioniert.
quelle
Dies steht im Einklang mit dem, was JP Mercier ursprünglich zur Verwendung von Pandas vorgeschlagen hatte, jedoch mit einer wesentlichen Änderung: Wenn Sie Daten in Blöcken in Pandas einlesen, sollten diese formbarer sein. Das bedeutet, dass Sie eine viel größere Datei analysieren können, als Pandas tatsächlich als Einzelstück verarbeiten kann, und sie in kleineren Größen an Spark übergeben können. (Dies beantwortet auch den Kommentar, warum man Spark verwenden möchte, wenn sie sowieso alles in Pandas laden können.)
quelle
Jetzt gibt es auch eine andere Option für jede allgemeine CSV-Datei: https://github.com/seahboonsiew/pyspark-csv wie folgt:
Angenommen, wir haben den folgenden Kontext
Verteilen Sie zunächst pyspark-csv.py mithilfe von SparkContext an Ausführende
Lesen Sie CSV-Daten über SparkContext und konvertieren Sie sie in DataFrame
quelle
Wenn Ihre CSV-Daten in keinem der Felder Zeilenumbrüche enthalten, können Sie Ihre Daten mit laden
textFile()
und analysierenquelle
Wenn Sie eine oder mehrere Zeilen mit weniger oder mehr Spalten als 2 im Dataset haben, kann dieser Fehler auftreten.
Ich bin auch neu in Pyspark und versuche, eine CSV-Datei zu lesen. Der folgende Code hat bei mir funktioniert:
In diesem Code verwende ich einen Datensatz von kaggle. Der Link lautet: https://www.kaggle.com/carrie1/ecommerce-data
1. Ohne das Schema zu erwähnen:
Überprüfen Sie nun die Spalten: sdfData.columns
Ausgabe wird sein:
Überprüfen Sie den Datentyp für jede Spalte:
Dadurch wird der Datenrahmen mit allen Spalten mit dem Datentyp StringType angegeben
2. Mit Schema: Wenn Sie das Schema kennen oder den Datentyp einer Spalte in der obigen Tabelle ändern möchten, verwenden Sie diese (Angenommen, ich habe folgende Spalten und möchte sie in einem bestimmten Datentyp für jede Spalte haben).
Überprüfen Sie nun das Schema für den Datentyp jeder Spalte:
Bearbeitet: Wir können auch die folgende Codezeile verwenden, ohne das Schema explizit zu erwähnen:
Die Ausgabe ist:
Die Ausgabe sieht folgendermaßen aus:
quelle
Bei der Verwendung
spark.read.csv
stelle ich fest, dass die Verwendung der Optionenescape='"'
undmultiLine=True
die konsistenteste Lösung für den CSV-Standard und meiner Erfahrung nach am besten mit aus Google Sheets exportierten CSV-Dateien funktioniert.Das ist,
quelle
import pyspark as spark
?spark
ist bereits initialisiert. In einem von eingereichten Skriptspark-submit
können Sie es als instanziierenfrom pyspark.sql import SparkSession; spark = SparkSession.builder.getOrCreate()
.