Wie speichere ich DataFrame direkt in Hive?

85

Ist es möglich, DataFrameFunken direkt bei Hive zu speichern ?

Ich habe versucht , mit der Umwandlung DataFramezu Rddund dann als Textdatei speichern und dann in Hive zu laden. Aber ich frage mich, ob ich direkt sparen kann, um dataframezu leben

Gourav
quelle

Antworten:

116

Sie können eine temporäre In-Memory-Tabelle erstellen und diese mit sqlContext in einer Hive-Tabelle speichern.

Nehmen wir an, Ihr Datenrahmen ist myDf. Sie können eine temporäre Tabelle erstellen mit:

myDf.createOrReplaceTempView("mytempTable") 

Anschließend können Sie mit einer einfachen Hive-Anweisung eine Tabelle erstellen und die Daten aus Ihrer temporären Tabelle sichern.

sqlContext.sql("create table mytable as select * from mytempTable");
Vinay Kumar
quelle
2
Dies umging die Parkett-Lesefehler, die ich bei der Verwendung von write.saveAsTable in spark 2.0 bekam
ski_squaw
2
Ja. Wir können jedoch Partition by on Data Frame verwenden, bevor wir die temporäre Tabelle erstellen. @chhantyal
Vinay Kumar
1
Wie konnten Sie den temporaryTisch mit dem hiveTisch kombinieren? Dabei sind show tablesnur die hiveTabellen für meine spark 2.3.0Installation enthalten
javadba
1
Diese temporäre Tabelle wird in Ihrem Hive-Kontext gespeichert und gehört in keiner Weise zu Hive-Tabellen.
Vinay Kumar
1
hi @VinayKumar, warum Sie sagen "Wenn Sie saveAsTable verwenden (es ist eher so, als würden Sie Ihren Datenrahmen beibehalten), müssen Sie sicherstellen, dass Ihrer Spark-Anwendung genügend Speicher zugewiesen ist." Könnten Sie diesen Punkt erklären?
enneppi
27

Verwenden Sie DataFrameWriter.saveAsTable. ( df.write.saveAsTable(...)) Siehe Spark SQL- und DataFrame-Handbuch .

Daniel Darabos
quelle
4
saveAsTable erstellt keine Hive-kompatiblen Tabellen. Die beste Lösung, die ich gefunden habe, ist Vinay Kumar.
RChat
@ Jacek: Ich habe diese Notiz selbst hinzugefügt, weil ich denke, dass meine Antwort falsch ist. Ich würde es löschen, außer dass es akzeptiert wird. Denken Sie, dass die Notiz falsch ist?
Daniel Darabos
Ja. Die Notiz war falsch und deshalb habe ich sie entfernt. "Bitte korrigieren Sie mich, wenn ich falsch liege" gilt hier :)
Jacek Laskowski
1
Wird dies df.write().saveAsTable(tableName) auch Streaming-Daten in die Tabelle schreiben?
user1870400
1
Nein, Sie können Streaming-Daten nicht mit saveAsTable speichern. Es ist nicht einmal in der API
Brian
20

Ich sehe nicht df.write.saveAsTable(...)veraltet in der Spark 2.0-Dokumentation. Es hat bei uns bei Amazon EMR funktioniert. Wir konnten Daten aus S3 perfekt in einen Datenrahmen einlesen, verarbeiten, aus dem Ergebnis eine Tabelle erstellen und mit MicroStrategy lesen. Vinays Antwort hat aber auch funktioniert.

Alex
quelle
5
Jemand hat diese Antwort aufgrund von Länge und Inhalt als minderwertig gekennzeichnet. Um ehrlich zu sein, wäre es wahrscheinlich besser als Kommentar gewesen. Ich denke, es ist seit zwei Jahren in Betrieb und einige Leute fanden es hilfreich, also könnte es gut sein, die Dinge so zu lassen, wie sie sind?
Serakfalcon
Ich stimme zu, Kommentar wäre die bessere Wahl gewesen. Lektion gelernt :-)
Alex
15

Sie müssen einen HiveContext haben / erstellen

import org.apache.spark.sql.hive.HiveContext;

HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());

Speichern Sie dann den Datenrahmen direkt oder wählen Sie die Spalten aus, die als Hive-Tabelle gespeichert werden sollen

df ist ein Datenrahmen

df.write().mode("overwrite").saveAsTable("schemaName.tableName");

oder

df.select(df.col("col1"),df.col("col2"), df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");

oder

df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");

SaveModes sind Append / Ignore / Overwrite / ErrorIfExists

Ich habe hier die Definition für HiveContext aus der Spark-Dokumentation hinzugefügt.

Zusätzlich zum grundlegenden SQLContext können Sie auch einen HiveContext erstellen, der eine Obermenge der vom grundlegenden SQLContext bereitgestellten Funktionen bietet. Weitere Funktionen sind die Möglichkeit, Abfragen mit dem vollständigeren HiveQL-Parser zu schreiben, auf Hive-UDFs zuzugreifen und Daten aus Hive-Tabellen zu lesen. Um einen HiveContext verwenden zu können, muss kein Hive-Setup vorhanden sein, und alle für einen SQLContext verfügbaren Datenquellen sind weiterhin verfügbar. HiveContext wird nur separat gepackt, um zu vermeiden, dass alle Abhängigkeiten von Hive in den Standard-Spark-Build aufgenommen werden.


In Spark Version 1.6.2 führt die Verwendung von "dbName.tableName" zu folgendem Fehler:

org.apache.spark.sql.AnalysisException: Die Angabe des Datenbanknamens oder anderer Qualifikationsmerkmale ist für temporäre Tabellen nicht zulässig. Wenn der Tabellenname Punkte (.) Enthält, geben Sie den Tabellennamen mit Backticks () an. "

Anandkumar
quelle
Ist der zweite Befehl: 'df.select (df.col ("col1"), df.col ("col2"), df.col ("col3")) .write (). Mode ("overwrite"). SaveAsTable ("schemaName.tableName"); ' Möchten Sie, dass die ausgewählten Spalten, die Sie überschreiben möchten, bereits in der Tabelle vorhanden sind? Sie haben also die vorhandene Tabelle und überschreiben nur die vorhandenen Spalten 1, 2, 3 mit den neuen Daten von Ihrem df in spark? ist das richtig interpretiert?
dieHellste
3
df.write().mode...muss geändert werden zudf.write.mode...
Benutzer 923227
8

Das Speichern in Hive ist nur eine Frage der Verwendung der write()Methode Ihres SQLContext:

df.write.saveAsTable(tableName)

Siehe https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/DataFrameWriter.html#saveAsTable(java.lang.String)

Ab Spark 2.2: Verwenden Sie stattdessen DataSet, dann DataFrame.

Raktotpal Bordoloi
quelle
Ich habe anscheinend einen Fehler, der besagt, dass Job abgebrochen wurde. Ich habe versucht, den folgenden Code pyspark_df.write.mode ("überschreiben"). SaveAsTable ("InjuryTab2")
Sade
Hallo! warum das? From Spark 2.2: use DataSet instead DataFrame.
Onofricamila
3

Es tut mir leid, dass ich zu spät in den Beitrag geschrieben habe, aber ich sehe keine akzeptierte Antwort.

df.write().saveAsTablewird werfen AnalysisExceptionund ist nicht HIVE-Tabelle kompatibel.

DF so speichern, wie df.write().format("hive")es der Trick sein sollte!

Wenn dies jedoch nicht funktioniert, ist dies meiner Meinung nach die beste Lösung (offen für Vorschläge).

Der beste Ansatz besteht darin, eine HIVE-Tabelle (einschließlich der PARTITIONED-Tabelle) explizit zu erstellen.

def createHiveTable: Unit ={
spark.sql("CREATE TABLE $hive_table_name($fields) " +
  "PARTITIONED BY ($partition_column String) STORED AS $StorageType")
}

DF als temporäre Tabelle speichern,

df.createOrReplaceTempView("$tempTableName")

und in die PARTITIONED HIVE-Tabelle einfügen:

spark.sql("insert into table default.$hive_table_name PARTITION($partition_column) select * from $tempTableName")
spark.sql("select * from default.$hive_table_name").show(1000,false)

Natürlich ist die LETZTE SPALTE in DF die PARTITIONSSÄULE. Erstellen Sie also eine entsprechende HIVE- Tabelle!

Bitte kommentieren Sie, ob es funktioniert! oder nicht.


--AKTUALISIEREN--

df.write()
  .partitionBy("$partition_column")
  .format("hive")
  .mode(SaveMode.append)
  .saveAsTable($new_table_name_to_be_created_in_hive)  //Table should not exist OR should be a PARTITIONED table in HIVE
Harshv
quelle
1

Hier ist die PySpark-Version zum Erstellen einer Hive-Tabelle aus einer Parkettdatei. Möglicherweise haben Sie Parkettdateien mithilfe des abgeleiteten Schemas generiert und möchten nun die Definition in den Hive-Metastore verschieben. Sie können die Definition auch auf das System wie AWS Glue oder AWS Athena übertragen und nicht nur auf den Hive-Metastore. Hier verwende ich spark.sql, um eine permanente Tabelle zu pushen / zu erstellen.

   # Location where my parquet files are present.
    df = spark.read.parquet("s3://my-location/data/")
    cols = df.dtypes
    buf = []
    buf.append('CREATE EXTERNAL TABLE test123 (')
    keyanddatatypes =  df.dtypes
    sizeof = len(df.dtypes)
    print ("size----------",sizeof)
    count=1;
    for eachvalue in keyanddatatypes:
        print count,sizeof,eachvalue
        if count == sizeof:
            total = str(eachvalue[0])+str(' ')+str(eachvalue[1])
        else:
            total = str(eachvalue[0]) + str(' ') + str(eachvalue[1]) + str(',')
        buf.append(total)
        count = count + 1

    buf.append(' )')
    buf.append(' STORED as parquet ')
    buf.append("LOCATION")
    buf.append("'")
    buf.append('s3://my-location/data/')
    buf.append("'")
    buf.append("'")
    ##partition by pt
    tabledef = ''.join(buf)

    print "---------print definition ---------"
    print tabledef
    ## create a table using spark.sql. Assuming you are using spark 2.1+
    spark.sql(tabledef);
kartik
quelle
1

Für externe Hive-Tabellen verwende ich diese Funktion in PySpark:

def save_table(sparkSession, dataframe, database, table_name, save_format="PARQUET"):
    print("Saving result in {}.{}".format(database, table_name))
    output_schema = "," \
        .join(["{} {}".format(x.name.lower(), x.dataType) for x in list(dataframe.schema)]) \
        .replace("StringType", "STRING") \
        .replace("IntegerType", "INT") \
        .replace("DateType", "DATE") \
        .replace("LongType", "INT") \
        .replace("TimestampType", "INT") \
        .replace("BooleanType", "BOOLEAN") \
        .replace("FloatType", "FLOAT")\
        .replace("DoubleType","FLOAT")
    output_schema = re.sub(r'DecimalType[(][0-9]+,[0-9]+[)]', 'FLOAT', output_schema)

    sparkSession.sql("DROP TABLE IF EXISTS {}.{}".format(database, table_name))

    query = "CREATE EXTERNAL TABLE IF NOT EXISTS {}.{} ({}) STORED AS {} LOCATION '/user/hive/{}/{}'" \
        .format(database, table_name, output_schema, save_format, database, table_name)
    sparkSession.sql(query)
    dataframe.write.insertInto('{}.{}'.format(database, table_name),overwrite = True)
Shadowtrooper
quelle
1

In meinem Fall funktioniert das gut:

from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()
hive.setDatabase("DatabaseName")
df = spark.read.format("csv").option("Header",True).load("/user/csvlocation.csv")
df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table",<tablename>).save()

Erledigt!!

Sie können die Daten lesen, als "Mitarbeiter" angeben

hive.executeQuery("select * from Employee").show()

Für weitere Informationen verwenden Sie diese URL: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/integrating-hive/content/hive-read-write-operations.html

MD Rijwan
quelle
0

Wenn Sie eine Hive-Tabelle (die nicht vorhanden ist) aus einem Datenrahmen erstellen möchten (manchmal kann sie nicht mit erstellt werden DataFrameWriter.saveAsTable). StructType.toDDLwill hilft beim Auflisten der Spalten als Zeichenfolge.

val df = ...

val schemaStr = df.schema.toDDL # This gives the columns 
spark.sql(s"""create table hive_table ( ${schemaStr})""")

//Now write the dataframe to the table
df.write.saveAsTable("hive_table")

hive_tablewird im Standardbereich erstellt, da wir bei keine Datenbank angegeben haben spark.sql(). stg.hive_tablekann zum Erstellen hive_tablein der stgDatenbank verwendet werden.

mrsrinivas
quelle
Detailliertes Beispiel finden Sie hier: stackoverflow.com/a/56833395/1592191
mrsrinivas
0

Sie könnten die Hortonworks Spark-Llap- Bibliothek wie diese verwenden

import com.hortonworks.hwc.HiveWarehouseSession

df.write
  .format(HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR)
  .mode("append")
  .option("table", "myDatabase.myTable")
  .save()
Mike
quelle