Angenommen, ich gebe drei Dateipfade zu einem Spark-Kontext zum Lesen und jede Datei hat ein Schema in der ersten Zeile. Wie können wir Schemazeilen aus Headern überspringen?
val rdd=sc.textFile("file1,file2,file3")
Wie können wir nun Kopfzeilen von dieser Festplatte überspringen?
scala
csv
apache-spark
Hafiz Mujadid
quelle
quelle
zipWithIndex
in der anderen Antwort vorgeschlagene Ansatz.rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
Wie können Sie sagen, dass der Indexwert 0 ein Header ist? Es wird nicht helfen, es kann ein Header oder ein anderer Wert von CSV oder ein Header mit dem Wert seindata = sc.textFile('path_to_data') header = data.first() #extract header data = data.filter(row => row != header) #filter out header
quelle
In Spark 2.0 ist ein CSV-Reader in Spark integriert, sodass Sie eine CSV-Datei wie folgt einfach laden können:
spark.read.option("header","true").csv("filePath")
quelle
Ab Spark 2.0 können Sie SparkSession verwenden , um dies als Einzeiler zu erledigen:
val spark = SparkSession.builder.config(conf).getOrCreate()
und dann wie @SandeepPurohit sagte:
val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)
Ich hoffe es hat deine Frage gelöst!
PS: SparkSession ist der neue Einstiegspunkt in Spark 2.0 und befindet sich unter dem Paket spark_sql
quelle
In PySpark können Sie einen Datenrahmen verwenden und den Header auf True setzen:
df = spark.read.csv(dataPath, header=True)
quelle
Sie können jede Datei separat laden, mit filtern
file.zipWithIndex().filter(_._2 > 0)
und dann alle Datei-RDDs zusammenführen.Wenn die Anzahl der Dateien zu groß ist, kann die Union a auslösen
StackOverflowExeption
.quelle
Verwenden Sie die
filter()
Methode in PySpark, indem Sie den ersten Spaltennamen herausfiltern, um den Header zu entfernen:# Read file (change format for other file formats) contentRDD = sc.textfile(<filepath>) # Filter out first column of the header filterDD = contentRDD.filter(lambda l: not l.startswith(<first column name>) # Check your result for i in filterDD.take(5) : print (i)
quelle
Arbeiten im Jahr 2018 (Spark 2.3)
Python
df = spark.read .option("header", "true") .format("csv") .schema(myManualSchema) .load("mycsv.csv")
Scala
val myDf = spark.read .option("header", "true") .format("csv") .schema(myManualSchema) .load("mycsv.csv")
PD1: myManualSchema ist ein vordefiniertes Schema, das von mir geschrieben wurde. Sie können diesen Teil des Codes überspringen
quelle
Es ist eine Option, die Sie an den
read()
Befehl übergeben:context = new org.apache.spark.sql.SQLContext(sc) var data = context.read.option("header","true").csv("<path>")
quelle
Alternativ können Sie das spark-csv-Paket verwenden (oder in Spark 2.0 ist dies mehr oder weniger nativ als CSV verfügbar). Beachten Sie, dass dies den Header für jede Datei erwartet (wie Sie es wünschen):
schema = StructType([ StructField('lat',DoubleType(),True), StructField('lng',DoubleType(),True)]) df = sqlContext.read.format('com.databricks.spark.csv'). \ options(header='true', delimiter="\t", treatEmptyValuesAsNulls=True, mode="DROPMALFORMED").load(input_file,schema=schema)
quelle
//Find header from the files lying in the directory val fileNameHeader = sc.binaryFiles("E:\\sss\\*.txt",1).map{ case (fileName, stream)=> val header = new BufferedReader(new InputStreamReader(stream.open())).readLine() (fileName, header) }.collect().toMap val fileNameHeaderBr = sc.broadcast(fileNameHeader) // Now let's skip the header. mapPartition will ensure the header // can only be the first line of the partition sc.textFile("E:\\sss\\*.txt",1).mapPartitions(iter => if(iter.hasNext){ val firstLine = iter.next() println(s"Comparing with firstLine $firstLine") if(firstLine == fileNameHeaderBr.value.head._2) new WrappedIterator(null, iter) else new WrappedIterator(firstLine, iter) } else { iter } ).collect().foreach(println) class WrappedIterator(firstLine:String,iter:Iterator[String]) extends Iterator[String]{ var isFirstIteration = true override def hasNext: Boolean = { if (isFirstIteration && firstLine != null){ true } else{ iter.hasNext } } override def next(): String = { if (isFirstIteration){ println(s"For the first time $firstLine") isFirstIteration = false if (firstLine != null){ firstLine } else{ println(s"Every time $firstLine") iter.next() } } else { iter.next() } } }
quelle
Für Python-Entwickler. Ich habe mit spark2.0 getestet. Angenommen, Sie möchten die ersten 14 Zeilen entfernen.
sc = spark.sparkContext lines = sc.textFile("s3://folder_location_of_csv/") parts = lines.map(lambda l: l.split(",")) parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])
withColumn ist die Funktion df. Daher funktioniert unten nicht im oben verwendeten RDD-Stil.
parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)
quelle