Data Flow und Delta Lake
Data Flow unterstützt standardmäßig Delta Lake, wenn Ihre Anwendungen Spark 3.2.1 oder höher ausführen.
Mit Delta Lake können Sie eine Lakehouse-Architektur auf Basis von Data Lakes erstellen. Delta Lake bietet ACID-Transaktionen, skalierbare Metadatenverarbeitung und vereinheitlicht Streaming- und Batchdatenverarbeitung auf Basis von vorhandenen Data Lakes. Delta Lake 3.1.0 wird mit der Data Flow Spark 3.5.0-Verarbeitungs-Engine unterstützt. Delta Lake 2.0.1 und 1.2.1 werden mit der Data Flow Spark 3.2.1-Verarbeitungs-Engine unterstützt.
- Die Spark-Version in Data Flow muss 3.2.1 (oder höher) sein.
- Verwenden Sie das delta-Format.
Delta Lake laden
Führen Sie diese Schritte aus, um Delta Lake für die Verwendung mit Data Flow zu laden.
spark.oracle.deltalake.version
, um anzugeben, welche Version von Delta Lake verwendet werden soll. Stellen Sie ihn auf einen der folgenden Werte ein:Spark-Version | Wert von spark.oracle.deltalake.version | Binärdateien geladen |
---|---|---|
3,5 | 3.1.0 |
Delta Lake 3.1.0 |
3.2.1 | 2.0.1 |
Delta Lake 2.0.1 |
3.2.1 | 1.2.1 |
Delta Lake 1.2.1 |
3,5 3,2 | none |
Es wurden keine Delta Lake-Binärdateien geladen. Sie müssen sie angeben. |
Wenn Sie keinen Wert für
spark.oracle.deltalake.version
festlegen, werden die Delta Lake 1.2.1-Binärdateien standardmäßig geladen.Wenn Sie spark.oracle.deltalake.version
auf none
setzen, müssen Sie die Delta Lake-Abhängigkeits-Librarys als Teil der Anwendungs-JAR angeben. Weitere Informationen finden Sie in der öffentlichen Dokumentation zu Delta Lake.
- delta-storage-3.1.0.jar
- Delta-Spark_2.12-3.1.0.jar
- Delta-Beiträge_2.12-3.1.0.jar
- Geben Sie für Java- oder Scala-Anwendungen die Delta Lake 3.1.0-Abhängigkeit vom Maven-Repository an:Oder stellen Sie für Python-Anwendungen die Delta Lake-Bibliothek in einem Package zusammen, und stellen Sie sie der Anwendung bereit.
<dependency> <groupId>io.delta</groupId> <artifactId>delta-spark_2.12</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>io.delta</groupId> <artifactId>delta-contribs_2.12</artifactId> <version>3.1.0</version> </dependency>
- Legen Sie die Spark-Konfiguration so fest, dass Delta Lake aktiviert wird:
spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
- delta-core_2.12-2.0.1.jar
- Delta-Beiträge_2.12-2.0.1.jar
- delta-storage-2.0.1.jar
- Geben Sie für Java- oder Scala-Anwendungen die Delta Lake 2.0.1-Abhängigkeit vom Maven-Repository an:Oder stellen Sie für Python-Anwendungen die Delta Lake-Bibliothek in einem Package zusammen, und stellen Sie sie der Anwendung bereit.
<dependency> <groupId>io.delta</groupId> <artifactId>delta-core_2.12</artifactId> <version>2.0.1</version> </dependency> <dependency> <groupId>io.delta</groupId> <artifactId>delta-contribs_2.12</artifactId> <version>2.0.1</version> </dependency>
- Legen Sie die Spark-Konfiguration so fest, dass Delta Lake aktiviert wird:
spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
Sie können Delta Lake auch in den erweiterten Optionen in der Konsole aktivieren, wenn Sie eine Anwendung erstellen oder eine Anwendung ausführen.
Beispielverwendung der Delta Lake-API
Beispiele für die Verwendung der Delta Lake-API mit Data Flow.
Die Datenfluss-Spark-Engine unterstützt standardmäßig das delta
-Format. Delta Lake-APIs sind für die Sprachen Java, Python und Scala verfügbar. Wenn Sie Delta Lake-Python-APIs verwenden, verwenden Sie den benutzerdefinierten archive.zip-Abhängigkeitspackager, nehmen Sie das Delta-Spark-Package auf, wie unter Spark-Submit-Funktionalität in Data Flow beschrieben.
Verwendungsbeispiele
- Java oder Scala
-
spark.read().format("delta").load(<path_to_Delta_table>) df.write().format("delta").save(<path_to_Delta_table>) val deltaTable = io.delta.tables.DeltaTable.forPath(spark, <path_to_Delta_table>) deltaTable.vacuum()
- Python
-
spark.read.format("delta").option("versionAsOf", 1).load(<path_to_Delta_table>) from delta.tables import * deltaTable = DeltaTable.forPath(spark, <path_to_Delta_table>) deltaTable.vacuum() deltaTable.history()
- SQL
spark.sql("CONVERT TO DELTA parquet.`" + <path_to_Parquet_table> + "`"); spark.sql("DESCRIBE HISTORY delta.`" + <path_to_Delta_table> + "`");
Beispiele
Hier finden Sie Codebeispiele, mit denen Sie mit der Verwendung von Delta Lake mit Data Flow beginnen können.
Beispiele finden Sie in den Oracle-Beispielen auf GitHub.