Data Flow und Delta Lake
Data Flow unterstützt standardmäßig Delta Lake, wenn Ihre Anwendungen Spark 3.2.1 oder höher ausführen.
Mit Delta Lake können Sie eine Lakehouse-Architektur auf Basis von Data Lakes erstellen. Delta Lake bietet ACID-Transaktionen, skalierbare Metadatenverarbeitung und vereinheitlicht Streaming- und Batchdatenverarbeitung auf Basis von vorhandenen Data Lakes. Delta Lake 3.1.0 wird mit der Data Flow Spark 3.5.0-Verarbeitungs-Engine unterstützt. Delta Lake 2.0.1 und 1.2.1 werden mit der Data Flow Spark 3.2.1-Verarbeitungs-Engine unterstützt.
- Die Spark-Version in Data Flow muss 3.2.1 (oder höher) sein.
- Verwenden Sie das delta-Format.
Delta Lake laden
Führen Sie diese Schritte aus, um Delta Lake für die Verwendung mit Data Flow zu laden.
spark.oracle.deltalake.version, um anzugeben, welche Version von Delta Lake verwendet werden soll. Stellen Sie ihn auf einen der folgenden Werte ein:| Spark-Version | Wert von spark.oracle.deltalake.version | Binärdateien geladen |
|---|---|---|
| 3,5 |
3.1.0
|
Delta Lake 3.1.0 |
| 3.2.1 |
2.0.1
|
Delta Lake 2.0.1 |
| 3.2.1 |
1.2.1
|
Delta Lake 1.2.1 |
| 3,5 3,2 |
none
|
Es wurden keine Delta Lake-Binärdateien geladen. Sie müssen sie angeben. |
Wenn Sie keinen Wert für
spark.oracle.deltalake.version festlegen, werden die Delta Lake 1.2.1-Binärdateien standardmäßig geladen.Wenn Sie spark.oracle.deltalake.version auf none setzen, müssen Sie die Delta Lake-Abhängigkeits-Librarys als Teil der Anwendungs-JAR angeben. Weitere Informationen finden Sie in der öffentlichen Dokumentation zu Delta Lake.
- delta-storage-3.1.0.jar
- Delta-Spark_2.12-3.1.0.jar
- Delta-Beiträge_2.12-3.1.0.jar
- Geben Sie für Java- oder Scala-Anwendungen die Delta Lake 3.1.0-Abhängigkeit vom Maven-Repository an:Oder stellen Sie für Python-Anwendungen die Delta Lake-Bibliothek in einem Package zusammen, und stellen Sie sie der Anwendung bereit.
<dependency> <groupId>io.delta</groupId> <artifactId>delta-spark_2.12</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>io.delta</groupId> <artifactId>delta-contribs_2.12</artifactId> <version>3.1.0</version> </dependency> - Legen Sie die Spark-Konfiguration so fest, dass Delta Lake aktiviert wird:
spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
- delta-core_2.12-2.0.1.jar
- Delta-Beiträge_2.12-2.0.1.jar
- delta-storage-2.0.1.jar
- Geben Sie für Java- oder Scala-Anwendungen die Delta Lake 2.0.1-Abhängigkeit vom Maven-Repository an:Oder stellen Sie für Python-Anwendungen die Delta Lake-Bibliothek in einem Package zusammen, und stellen Sie sie der Anwendung bereit.
<dependency> <groupId>io.delta</groupId> <artifactId>delta-core_2.12</artifactId> <version>2.0.1</version> </dependency> <dependency> <groupId>io.delta</groupId> <artifactId>delta-contribs_2.12</artifactId> <version>2.0.1</version> </dependency> - Legen Sie die Spark-Konfiguration so fest, dass Delta Lake aktiviert wird:
spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
Sie können Delta Lake auch in den erweiterten Optionen in der Konsole aktivieren, wenn Sie eine Anwendung erstellen oder eine Anwendung ausführen.
Beispielverwendung der Delta Lake-API
Beispiele für die Verwendung der Delta Lake-API mit Data Flow.
Die Data Flow-Spark-Engine unterstützt standardmäßig das delta-Format. Delta Lake-APIs sind für die Sprachen Java, Python und Scala verfügbar. Wenn Sie Delta Lake-Python-APIs verwenden, verwenden Sie den benutzerdefinierten archive.zip-Abhängigkeitspackager, nehmen Sie das Delta-Spark-Package auf, wie unter Spark-Submit-Funktionalität in Data Flow beschrieben.
Verwendungsbeispiele
- Java oder Scala
-
spark.read().format("delta").load(<path_to_Delta_table>) df.write().format("delta").save(<path_to_Delta_table>) val deltaTable = io.delta.tables.DeltaTable.forPath(spark, <path_to_Delta_table>) deltaTable.vacuum() - Python
-
spark.read.format("delta").option("versionAsOf", 1).load(<path_to_Delta_table>) from delta.tables import * deltaTable = DeltaTable.forPath(spark, <path_to_Delta_table>) deltaTable.vacuum() deltaTable.history() - SQL
-
spark.sql("CONVERT TO DELTA parquet.`" + <path_to_Parquet_table> + "`"); spark.sql("DESCRIBE HISTORY delta.`" + <path_to_Delta_table> + "`");
Beispiele
Hier finden Sie Codebeispiele, mit denen Sie mit der Verwendung von Delta Lake mit Data Flow beginnen können.
Beispiele finden Sie in den Oracle-Beispielen auf GitHub.