Wie überspringe ich einen Header aus CSV-Dateien in Spark?

68

Angenommen, ich gebe drei Dateipfade zu einem Spark-Kontext zum Lesen und jede Datei hat ein Schema in der ersten Zeile. Wie können wir Schemazeilen aus Headern überspringen?

val rdd=sc.textFile("file1,file2,file3")

Wie können wir nun Kopfzeilen von dieser Festplatte überspringen?

Hafiz Mujadid
quelle

Antworten:

70

Wenn der erste Datensatz nur eine Kopfzeile enthalten würde, wäre der effizienteste Weg, diese herauszufiltern:

rdd.mapPartitionsWithIndex {
  (idx, iter) => if (idx == 0) iter.drop(1) else iter 
}

Dies hilft nicht, wenn natürlich viele Dateien mit vielen Kopfzeilen enthalten sind. Sie können drei RDDs, die Sie auf diese Weise erstellen, zusammenschließen.

Sie können auch einfach filtereine Zeile schreiben , die nur einer Zeile entspricht, die eine Überschrift sein kann. Dies ist recht einfach, aber weniger effizient.

Python-Äquivalent:

from itertools import islice

rdd.mapPartitionsWithIndex(
    lambda idx, it: islice(it, 1, None) if idx == 0 else it 
)
Sean Owen
quelle
4
Die Filtermethode wäre immer noch effizienter als der zipWithIndex in der anderen Antwort vorgeschlagene Ansatz.
Maasg
Nein, es gibt nicht nur eine einzelne Zeile, es kann auch eine Zeile für jede Datei geben.
Hafiz Mujadid
Ja, ich meine, Sie könnten eine RDD für jede Datei erstellen und ihren einzelnen Header auf diese Weise entfernen, dann Union.
Sean Owen
fehlende und Drop (n) Methode hier
Julio
1
rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }Wie können Sie sagen, dass der Indexwert 0 ein Header ist? Es wird nicht helfen, es kann ein Header oder ein anderer Wert von CSV oder ein Header mit dem Wert sein
Shubham Agrawal
97
data = sc.textFile('path_to_data')
header = data.first() #extract header
data = data.filter(row => row != header)   #filter out header
Jimmy
quelle
6
In der Frage wird gefragt, wie Header in einer CSV-Datei übersprungen werden sollen. Wenn Header jemals vorhanden sind, sind sie in der ersten Zeile vorhanden.
Jimmy
3
Das ist nicht immer wahr. Wenn Sie eine CSV mit Spark schreiben, können mehrere Dateien mit jeweils einem eigenen Header vorhanden sein. Wenn Sie dies als Eingabe für ein anderes Spark-Programm verwenden, erhalten Sie mehrere Header. Außerdem können Sie mit Spark mehrere Dateien gleichzeitig eingeben.
Sal
intuitive Annäherung
Jack AKA Karthik
Fehler: Rekursive Wertdaten müssen Typ sein. Ändern Sie die letzte Zeile in dataFiltered = data.filter (Zeile => Zeile! = Header)
Amit Sadafule
Scannt diese Lösung die gesamte Festplatte und überprüft jede Zeile, um den Header ganz oben einzufügen? Ist das wirklich der effizienteste Weg?
iLikeKFC
59

In Spark 2.0 ist ein CSV-Reader in Spark integriert, sodass Sie eine CSV-Datei wie folgt einfach laden können:

spark.read.option("header","true").csv("filePath")
Sandeep Purohit
quelle
Sind Sie sicher, dass dies ab 2.0 funktioniert? Ich verwende v2.0.1 und erhalte "AttributeError: 'SparkContext'-Objekt hat kein Attribut' read '".
Ciri
10
@ Ciri Spark ist kein SparkContext-Objekt, sondern das SparkSession-Objekt. Wenn Sie also einen CSV-Reader verwenden möchten, benötigen Sie das SparkSession-Objekt
Sandeep Purohit,
14

Ab Spark 2.0 können Sie SparkSession verwenden , um dies als Einzeiler zu erledigen:

val spark = SparkSession.builder.config(conf).getOrCreate()

und dann wie @SandeepPurohit sagte:

val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)

Ich hoffe es hat deine Frage gelöst!

PS: SparkSession ist der neue Einstiegspunkt in Spark 2.0 und befindet sich unter dem Paket spark_sql

Shiv4nsh
quelle
7

In PySpark können Sie einen Datenrahmen verwenden und den Header auf True setzen:

df = spark.read.csv(dataPath, header=True)
hayj
quelle
alternativ context = new org.apache.spark.sql.SQLContext (sc); var data = context.read.option ("header", "true"). csv ("<Pfad>");
Sahan Jayasumana
5

Sie können jede Datei separat laden, mit filtern file.zipWithIndex().filter(_._2 > 0)und dann alle Datei-RDDs zusammenführen.

Wenn die Anzahl der Dateien zu groß ist, kann die Union a auslösen StackOverflowExeption.

pzecevic
quelle
4

Verwenden Sie die filter()Methode in PySpark, indem Sie den ersten Spaltennamen herausfiltern, um den Header zu entfernen:

# Read file (change format for other file formats)
contentRDD = sc.textfile(<filepath>)

# Filter out first column of the header
filterDD = contentRDD.filter(lambda l: not l.startswith(<first column name>)

# Check your result
for i in filterDD.take(5) : print (i)
kumara81205
quelle
Wie unterscheidet sich das von dieser gegebenen Antwort ? Für Ihre Antwort müssen Sie den Namen der ersten Spalte im Voraus kennen.
OneCricketeer
@ Cricket_007 Coz Dies filtert mehrere Header-Spalten heraus, wie von anderen Benutzern angegeben.
Abdul Mannan
4

Arbeiten im Jahr 2018 (Spark 2.3)

Python

df = spark.read
    .option("header", "true")
    .format("csv")
    .schema(myManualSchema)
    .load("mycsv.csv")

Scala

val myDf = spark.read
  .option("header", "true")
  .format("csv")
  .schema(myManualSchema)
  .load("mycsv.csv")

PD1: myManualSchema ist ein vordefiniertes Schema, das von mir geschrieben wurde. Sie können diesen Teil des Codes überspringen

Antonio Cachuan
quelle
1
Also, was ist die -1 für hier?
Thebluephantom
1
@Antonio Cachuan Sie sollten Code geben, der funktioniert, ohne dass Ihr persönliches Beispiel "Schema (myManualSchema)" überhaupt nicht in der Lösung sein sollte.
Enrique Benito Casado
1

Es ist eine Option, die Sie an den read()Befehl übergeben:

context = new org.apache.spark.sql.SQLContext(sc)

var data = context.read.option("header","true").csv("<path>")
Sahan Jayasumana
quelle
0

Alternativ können Sie das spark-csv-Paket verwenden (oder in Spark 2.0 ist dies mehr oder weniger nativ als CSV verfügbar). Beachten Sie, dass dies den Header für jede Datei erwartet (wie Sie es wünschen):

schema = StructType([
        StructField('lat',DoubleType(),True),
        StructField('lng',DoubleType(),True)])

df = sqlContext.read.format('com.databricks.spark.csv'). \
     options(header='true',
             delimiter="\t",
             treatEmptyValuesAsNulls=True,
             mode="DROPMALFORMED").load(input_file,schema=schema)
Adrian Bridgett
quelle
-2
//Find header from the files lying in the directory
val fileNameHeader = sc.binaryFiles("E:\\sss\\*.txt",1).map{
    case (fileName, stream)=>
        val header = new BufferedReader(new InputStreamReader(stream.open())).readLine()
        (fileName, header)
}.collect().toMap

val fileNameHeaderBr = sc.broadcast(fileNameHeader)

// Now let's skip the header. mapPartition will ensure the header
// can only be the first line of the partition
sc.textFile("E:\\sss\\*.txt",1).mapPartitions(iter =>
    if(iter.hasNext){
        val firstLine = iter.next()
        println(s"Comparing with firstLine $firstLine")
        if(firstLine == fileNameHeaderBr.value.head._2)
            new WrappedIterator(null, iter)
        else
            new WrappedIterator(firstLine, iter)
    }
    else {
        iter
    }
).collect().foreach(println)

class WrappedIterator(firstLine:String,iter:Iterator[String]) extends Iterator[String]{
    var isFirstIteration = true
    override def hasNext: Boolean = {
        if (isFirstIteration && firstLine != null){
            true
        }
        else{
            iter.hasNext
        }
    }

    override def next(): String = {
        if (isFirstIteration){
            println(s"For the first time $firstLine")
            isFirstIteration = false
            if (firstLine != null){
                firstLine
            }
            else{
                println(s"Every time $firstLine")
                iter.next()
            }
        }
        else {
          iter.next()
        }
    }
}
RockSolid
quelle
-2

Für Python-Entwickler. Ich habe mit spark2.0 getestet. Angenommen, Sie möchten die ersten 14 Zeilen entfernen.

sc = spark.sparkContext
lines = sc.textFile("s3://folder_location_of_csv/")
parts = lines.map(lambda l: l.split(","))
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])

withColumn ist die Funktion df. Daher funktioniert unten nicht im oben verwendeten RDD-Stil.

parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)
kartik
quelle
1
Hallo Kartik, ich denke, Ihre Lösung befasst sich mit einer einzelnen Datei, aber die Frage war anders.
Hafiz Mujadid
Nur der erste Teil Ihres Codes ist korrekt. Eine monoton steigende ID garantiert keine fortlaufenden Nummern. Bitte sei so nett und überarbeite.
Alper t. Turker