Data Flow et Delta Lake
Data Flow prend en charge Delta Lake par défaut lorsque vos applications exécutent Spark 3.2.1 ou une version ultérieure.
Delta Lake vous permet de créer une architecture lakehouse sur des lacs de données. Delta Lake fournit des transactions ACID et une gestion évolutive des métadonnées, et unifie la transmission en continu et le traitement de données en batch sur les lacs de données existants. Delta Lake 3.1.0 est pris en charge avec le moteur de traitement Spark 3.5.0 Data Flow, Delta Lake 2.0.1 et 1.2.1 sont pris en charge avec le moteur de traitement Spark 3.2.1 Data Flow.
- La version de Spark dans Data Flow doit être 3.2.1 (ou une version ultérieure).
- Utilisez le format delta.
Charger le lac Delta
Suivez ces étapes pour charger Delta Lake à utiliser avec Data Flow.
spark.oracle.deltalake.version
, pour indiquer la version de Delta Lake à utiliser. Définissez-la sur l'une des valeurs suivantes :Version de Spark | Valeur de spark.oracle.deltalake.version | Fichiers binaires chargés |
---|---|---|
3,5 | 3.1.0 |
Delta Lake 3.1.0 |
3.2.1 | 2.0.1 |
Delta Lake 2.0.1 |
3.2.1 | 1.2.1 |
Delta Lake 1.2.1 |
3,5 3,2 | none |
Aucun fichier binaire Delta Lake n'est chargé. Vous devez le fournir. |
Si vous ne définissez pas de valeur pour
spark.oracle.deltalake.version
, les fichiers binaires Delta Lake 1.2.1 sont chargés par défaut.Si vous définissez spark.oracle.deltalake.version
sur none
, vous devez fournir les bibliothèques de dépendances Delta Lake dans le cadre du fichier JAR d'application. Pour plus d'informations, reportez-vous à la documentation publique de Delta Lake.
- delta-storage-3.1.0.jar
- delta-spark_2.12-3.1.0.jar
- delta-contribs_2.12-3.1.0.jar
- Pour les applications Java ou Scala, fournissez la dépendance Delta Lake 3.1.0 à partir du référentiel maven :Ou pour les applications Python, packagez la bibliothèque Delta Lake et fournissez-la à l'application.
<dependency> <groupId>io.delta</groupId> <artifactId>delta-spark_2.12</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>io.delta</groupId> <artifactId>delta-contribs_2.12</artifactId> <version>3.1.0</version> </dependency>
- Définissez la configuration Spark pour activer Delta Lake :
spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
- delta-core_2.12-2.0.1.jar
- delta-contribs_2.12-2.0.1.jar
- delta-storage-2.0.1.jar
- Pour les applications Java ou Scala, fournissez la dépendance Delta Lake 2.0.1 à partir du référentiel maven :Ou pour les applications Python, packagez la bibliothèque Delta Lake et fournissez-la à l'application.
<dependency> <groupId>io.delta</groupId> <artifactId>delta-core_2.12</artifactId> <version>2.0.1</version> </dependency> <dependency> <groupId>io.delta</groupId> <artifactId>delta-contribs_2.12</artifactId> <version>2.0.1</version> </dependency>
- Définissez la configuration Spark pour activer Delta Lake :
spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
Vous pouvez également activer Delta Lake dans les options avancées fournies dans la console lors de la création d'une application ou de l'exécution d'une application.
Exemple d'utilisation de l'API Delta Lake
Voici des exemples d'utilisation de l'API Delta Lake avec Data Flow.
Le moteur Spark Data Flow prend en charge le format delta
par défaut. Les API Delta Lake sont disponibles pour les langages Java, Python et Scala. Si vous utilisez des API Python Delta Lake, utilisez l'utilitaire de package de dépendances archive.zip personnalisé, incluez le package delta-spark, comme décrit dans Fonctionnalité de soumission Spark dans Data Flow.
Exemples d'utilisation
- Java ou Scala
-
spark.read().format("delta").load(<path_to_Delta_table>) df.write().format("delta").save(<path_to_Delta_table>) val deltaTable = io.delta.tables.DeltaTable.forPath(spark, <path_to_Delta_table>) deltaTable.vacuum()
- Python
-
spark.read.format("delta").option("versionAsOf", 1).load(<path_to_Delta_table>) from delta.tables import * deltaTable = DeltaTable.forPath(spark, <path_to_Delta_table>) deltaTable.vacuum() deltaTable.history()
- SQL
spark.sql("CONVERT TO DELTA parquet.`" + <path_to_Parquet_table> + "`"); spark.sql("DESCRIBE HISTORY delta.`" + <path_to_Delta_table> + "`");
Exemples
Voici des exemples de code pour vous aider à commencer à utiliser Delta Lake avec Data Flow.
Les exemples sont disponibles à partir des échantillons Oracle sur GitHub.