Ist es möglich, DataFrame
Funken direkt bei Hive zu speichern ?
Ich habe versucht , mit der Umwandlung DataFrame
zu Rdd
und dann als Textdatei speichern und dann in Hive zu laden. Aber ich frage mich, ob ich direkt sparen kann, um dataframe
zu leben
scala
apache-spark
hive
apache-spark-sql
Gourav
quelle
quelle
temporary
Tisch mit demhive
Tisch kombinieren? Dabei sindshow tables
nur diehive
Tabellen für meinespark 2.3.0
Installation enthaltenVerwenden Sie
DataFrameWriter.saveAsTable
. (df.write.saveAsTable(...)
) Siehe Spark SQL- und DataFrame-Handbuch .quelle
df.write().saveAsTable(tableName)
auch Streaming-Daten in die Tabelle schreiben?Ich sehe nicht
df.write.saveAsTable(...)
veraltet in der Spark 2.0-Dokumentation. Es hat bei uns bei Amazon EMR funktioniert. Wir konnten Daten aus S3 perfekt in einen Datenrahmen einlesen, verarbeiten, aus dem Ergebnis eine Tabelle erstellen und mit MicroStrategy lesen. Vinays Antwort hat aber auch funktioniert.quelle
Sie müssen einen HiveContext haben / erstellen
import org.apache.spark.sql.hive.HiveContext; HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());
Speichern Sie dann den Datenrahmen direkt oder wählen Sie die Spalten aus, die als Hive-Tabelle gespeichert werden sollen
df ist ein Datenrahmen
df.write().mode("overwrite").saveAsTable("schemaName.tableName");
oder
df.select(df.col("col1"),df.col("col2"), df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");
oder
df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");
SaveModes sind Append / Ignore / Overwrite / ErrorIfExists
Ich habe hier die Definition für HiveContext aus der Spark-Dokumentation hinzugefügt.
Zusätzlich zum grundlegenden SQLContext können Sie auch einen HiveContext erstellen, der eine Obermenge der vom grundlegenden SQLContext bereitgestellten Funktionen bietet. Weitere Funktionen sind die Möglichkeit, Abfragen mit dem vollständigeren HiveQL-Parser zu schreiben, auf Hive-UDFs zuzugreifen und Daten aus Hive-Tabellen zu lesen. Um einen HiveContext verwenden zu können, muss kein Hive-Setup vorhanden sein, und alle für einen SQLContext verfügbaren Datenquellen sind weiterhin verfügbar. HiveContext wird nur separat gepackt, um zu vermeiden, dass alle Abhängigkeiten von Hive in den Standard-Spark-Build aufgenommen werden.
In Spark Version 1.6.2 führt die Verwendung von "dbName.tableName" zu folgendem Fehler:
quelle
df.write().mode...
muss geändert werden zudf.write.mode...
Das Speichern in Hive ist nur eine Frage der Verwendung der
write()
Methode Ihres SQLContext:Siehe https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/DataFrameWriter.html#saveAsTable(java.lang.String)
Ab Spark 2.2: Verwenden Sie stattdessen DataSet, dann DataFrame.
quelle
From Spark 2.2: use DataSet instead DataFrame.
Es tut mir leid, dass ich zu spät in den Beitrag geschrieben habe, aber ich sehe keine akzeptierte Antwort.
df.write().saveAsTable
wird werfenAnalysisException
und ist nicht HIVE-Tabelle kompatibel.DF so speichern, wie
df.write().format("hive")
es der Trick sein sollte!Wenn dies jedoch nicht funktioniert, ist dies meiner Meinung nach die beste Lösung (offen für Vorschläge).
Der beste Ansatz besteht darin, eine HIVE-Tabelle (einschließlich der PARTITIONED-Tabelle) explizit zu erstellen.
def createHiveTable: Unit ={ spark.sql("CREATE TABLE $hive_table_name($fields) " + "PARTITIONED BY ($partition_column String) STORED AS $StorageType") }
DF als temporäre Tabelle speichern,
df.createOrReplaceTempView("$tempTableName")
und in die PARTITIONED HIVE-Tabelle einfügen:
spark.sql("insert into table default.$hive_table_name PARTITION($partition_column) select * from $tempTableName") spark.sql("select * from default.$hive_table_name").show(1000,false)
Natürlich ist die LETZTE SPALTE in DF die PARTITIONSSÄULE. Erstellen Sie also eine entsprechende HIVE- Tabelle!
Bitte kommentieren Sie, ob es funktioniert! oder nicht.
--AKTUALISIEREN--
df.write() .partitionBy("$partition_column") .format("hive") .mode(SaveMode.append) .saveAsTable($new_table_name_to_be_created_in_hive) //Table should not exist OR should be a PARTITIONED table in HIVE
quelle
Hier ist die PySpark-Version zum Erstellen einer Hive-Tabelle aus einer Parkettdatei. Möglicherweise haben Sie Parkettdateien mithilfe des abgeleiteten Schemas generiert und möchten nun die Definition in den Hive-Metastore verschieben. Sie können die Definition auch auf das System wie AWS Glue oder AWS Athena übertragen und nicht nur auf den Hive-Metastore. Hier verwende ich spark.sql, um eine permanente Tabelle zu pushen / zu erstellen.
# Location where my parquet files are present. df = spark.read.parquet("s3://my-location/data/") cols = df.dtypes buf = [] buf.append('CREATE EXTERNAL TABLE test123 (') keyanddatatypes = df.dtypes sizeof = len(df.dtypes) print ("size----------",sizeof) count=1; for eachvalue in keyanddatatypes: print count,sizeof,eachvalue if count == sizeof: total = str(eachvalue[0])+str(' ')+str(eachvalue[1]) else: total = str(eachvalue[0]) + str(' ') + str(eachvalue[1]) + str(',') buf.append(total) count = count + 1 buf.append(' )') buf.append(' STORED as parquet ') buf.append("LOCATION") buf.append("'") buf.append('s3://my-location/data/') buf.append("'") buf.append("'") ##partition by pt tabledef = ''.join(buf) print "---------print definition ---------" print tabledef ## create a table using spark.sql. Assuming you are using spark 2.1+ spark.sql(tabledef);
quelle
Für externe Hive-Tabellen verwende ich diese Funktion in PySpark:
def save_table(sparkSession, dataframe, database, table_name, save_format="PARQUET"): print("Saving result in {}.{}".format(database, table_name)) output_schema = "," \ .join(["{} {}".format(x.name.lower(), x.dataType) for x in list(dataframe.schema)]) \ .replace("StringType", "STRING") \ .replace("IntegerType", "INT") \ .replace("DateType", "DATE") \ .replace("LongType", "INT") \ .replace("TimestampType", "INT") \ .replace("BooleanType", "BOOLEAN") \ .replace("FloatType", "FLOAT")\ .replace("DoubleType","FLOAT") output_schema = re.sub(r'DecimalType[(][0-9]+,[0-9]+[)]', 'FLOAT', output_schema) sparkSession.sql("DROP TABLE IF EXISTS {}.{}".format(database, table_name)) query = "CREATE EXTERNAL TABLE IF NOT EXISTS {}.{} ({}) STORED AS {} LOCATION '/user/hive/{}/{}'" \ .format(database, table_name, output_schema, save_format, database, table_name) sparkSession.sql(query) dataframe.write.insertInto('{}.{}'.format(database, table_name),overwrite = True)
quelle
In meinem Fall funktioniert das gut:
from pyspark_llap import HiveWarehouseSession hive = HiveWarehouseSession.session(spark).build() hive.setDatabase("DatabaseName") df = spark.read.format("csv").option("Header",True).load("/user/csvlocation.csv") df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table",<tablename>).save()
Erledigt!!
Sie können die Daten lesen, als "Mitarbeiter" angeben
hive.executeQuery("select * from Employee").show()
Für weitere Informationen verwenden Sie diese URL: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/integrating-hive/content/hive-read-write-operations.html
quelle
val df = ... val schemaStr = df.schema.toDDL # This gives the columns spark.sql(s"""create table hive_table ( ${schemaStr})""") //Now write the dataframe to the table df.write.saveAsTable("hive_table")
hive_table
wird im Standardbereich erstellt, da wir bei keine Datenbank angegeben habenspark.sql()
.stg.hive_table
kann zum Erstellenhive_table
in derstg
Datenbank verwendet werden.quelle
Sie könnten die Hortonworks Spark-Llap- Bibliothek wie diese verwenden
import com.hortonworks.hwc.HiveWarehouseSession df.write .format(HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR) .mode("append") .option("table", "myDatabase.myTable") .save()
quelle