Ich verwende Spark 1.3.1 (PySpark) und habe mithilfe einer SQL-Abfrage eine Tabelle generiert. Ich habe jetzt ein Objekt, das a ist DataFrame
. Ich möchte dieses DataFrame
Objekt (ich habe es "Tabelle" genannt) in eine CSV-Datei exportieren, damit ich es bearbeiten und die Spalten zeichnen kann. Wie exportiere ich die DataFrame
"Tabelle" in eine CSV-Datei?
Vielen Dank!
quelle
df.write.csv('/tmp/lookatme/')
, wird eine Reihe von CSV-Dateien in/tmp/lookatme
Using Spark gelöscht. Dies ist erheblich schneller als die Serialisierung in Pandas. Der einzige Nachteil ist, dass Sie am Ende eine Reihe von CSVs anstelle einer einzigen haben. Wenn das Ziel-Tool nicht weiß, wie man sie verkettet, müssen Sie dies selbst tun.to_csv
funktioniert, ohne dass Pandas importiert werden müssen..toPandas
ist ein Teil von Spark, vielleicht importiert es es implizit ..df.coalesce(1).write.csv('mycsv.csv')
wenn Sie darauf bestehen, eine einzige Ausgabedatei zu habenFür Apache Spark 2+, um den Datenrahmen in einer einzelnen CSV-Datei zu speichern. Verwenden Sie den folgenden Befehl
query.repartition(1).write.csv("cc_out.csv", sep='|')
Hier
1
geben Sie an, dass ich nur eine Partition von csv benötige. Sie können es entsprechend Ihren Anforderungen ändern.quelle
repartition(1)
. Sie müssen die Daten dafür in beide Richtungen mischen Das Zusammenwachsen wird im Großen und Ganzen überhaupt nicht helfenWenn Sie spark-csv nicht verwenden können, können Sie Folgendes tun:
df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("file.csv")
Wenn Sie Zeichenfolgen mit Zeilenumbrüchen oder Komma behandeln müssen, funktioniert dies nicht. Benutze das:
import csv import cStringIO def row2csv(row): buffer = cStringIO.StringIO() writer = csv.writer(buffer) writer.writerow([str(s).encode("utf-8") for s in row]) buffer.seek(0) return buffer.read().strip() df.rdd.map(row2csv).coalesce(1).saveAsTextFile("file.csv")
quelle
Sie müssen den Datenrahmen in einer einzelnen Partition neu partitionieren und dann das Format, den Pfad und andere Parameter für die Datei im Unix-Dateisystemformat definieren.
df.repartition(1).write.format('com.databricks.spark.csv').save("/path/to/file/myfile.csv",header = 'true')
Lesen Sie mehr über die Repartitionsfunktion Lesen Sie mehr über die Speicherfunktion
Die Neupartitionierung ist jedoch eine kostspielige Funktion und toPandas () ist am schlechtesten. Verwenden Sie in der vorherigen Syntax .coalesce (1) anstelle von .repartition (1), um eine bessere Leistung zu erzielen.
Lesen Sie mehr über Repartition vs Coalesce-Funktionen .
quelle
Wie wäre es damit (wenn Sie keinen Einzeiler wollen)?
for row in df.collect(): d = row.asDict() s = "%d\t%s\t%s\n" % (d["int_column"], d["string_column"], d["string_column"]) f.write(s)
f ist ein geöffneter Dateideskriptor. Auch das Trennzeichen ist ein TAB-Zeichen, aber es ist einfach, zu ändern, was Sie wollen.
quelle