Data Flow et Delta Lake

Data Flow prend en charge Delta Lake par défaut lorsque vos applications exécutent Spark 3.2.1 ou une version ultérieure.

Delta Lake vous permet de créer une architecture lakehouse sur des lacs de données. Delta Lake fournit des transactions ACID et une gestion évolutive des métadonnées, et unifie la transmission en continu et le traitement de données en batch sur les lacs de données existants. Delta Lake 3.1.0 est pris en charge avec le moteur de traitement Spark 3.5.0 Data Flow, Delta Lake 2.0.1 et 1.2.1 sont pris en charge avec le moteur de traitement Spark 3.2.1 Data Flow.

Pour utiliser Delta Lake avec Data Flow, procédez comme suit :
  • La version de Spark dans Data Flow doit être 3.2.1 (ou une version ultérieure).
  • Utilisez le format delta.
Pour plus d'informations sur Delta Lake et son utilisation, reportez-vous aux notes sur la version de Delta Lake et à la documentation Delta Lake.

Charger le lac Delta

Suivez ces étapes pour charger Delta Lake à utiliser avec Data Flow.

Utilisez la propriété de configuration Spark, spark.oracle.deltalake.version, pour indiquer la version de Delta Lake à utiliser. Définissez-la sur l'une des valeurs suivantes :
Valeurs Spark.oracle.deltalake.version
Version de Spark Valeur de spark.oracle.deltalake.version Fichiers binaires chargés
3,5 3.1.0 Delta Lake 3.1.0
3.2.1 2.0.1 Delta Lake 2.0.1
3.2.1 1.2.1 Delta Lake 1.2.1
3,5 3,2 none Aucun fichier binaire Delta Lake n'est chargé. Vous devez le fournir.
Remarque

Si vous ne définissez pas de valeur pour spark.oracle.deltalake.version, les fichiers binaires Delta Lake 1.2.1 sont chargés par défaut.

Si vous définissez spark.oracle.deltalake.version sur none, vous devez fournir les bibliothèques de dépendances Delta Lake dans le cadre du fichier JAR d'application. Pour plus d'informations, reportez-vous à la documentation publique de Delta Lake.

Par exemple, pour charger Delta Lake 3.1.0, packagez les bibliothèques suivantes :
  • delta-storage-3.1.0.jar
  • delta-spark_2.12-3.1.0.jar
  • delta-contribs_2.12-3.1.0.jar
et procédez comme suit :
  1. Pour les applications Java ou Scala, fournissez la dépendance Delta Lake 3.1.0 à partir du référentiel maven :
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-spark_2.12</artifactId>
        <version>3.1.0</version>
    </dependency>
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-contribs_2.12</artifactId>
        <version>3.1.0</version>
    </dependency>
    Ou pour les applications Python, packagez la bibliothèque Delta Lake et fournissez-la à l'application.
  2. Définissez la configuration Spark pour activer Delta Lake :
    spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore
    spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension
    spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
Par exemple, pour charger Delta Lake 2.0.1, packagez les bibliothèques suivantes :
  • delta-core_2.12-2.0.1.jar
  • delta-contribs_2.12-2.0.1.jar
  • delta-storage-2.0.1.jar
et procédez comme suit :
  1. Pour les applications Java ou Scala, fournissez la dépendance Delta Lake 2.0.1 à partir du référentiel maven :
    <dependency>
      <groupId>io.delta</groupId>
      <artifactId>delta-core_2.12</artifactId>
      <version>2.0.1</version>
    </dependency> 
    <dependency>
      <groupId>io.delta</groupId>
      <artifactId>delta-contribs_2.12</artifactId>
      <version>2.0.1</version>
      </dependency>
    Ou pour les applications Python, packagez la bibliothèque Delta Lake et fournissez-la à l'application.
  2. Définissez la configuration Spark pour activer Delta Lake :
    spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore
    spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension
    spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
Remarque

Vous pouvez également activer Delta Lake dans les options avancées fournies dans la console lors de la création d'une application ou de l'exécution d'une application.

Exemple d'utilisation de l'API Delta Lake

Voici des exemples d'utilisation de l'API Delta Lake avec Data Flow.

Le moteur Spark Data Flow prend en charge le format delta par défaut. Les API Delta Lake sont disponibles pour les langages Java, Python et Scala. Si vous utilisez des API Python Delta Lake, utilisez l'utilitaire de package de dépendances archive.zip personnalisé, incluez le package delta-spark, comme décrit dans Fonctionnalité de soumission Spark dans Data Flow.

Exemples d'utilisation

Java ou Scala
spark.read().format("delta").load(<path_to_Delta_table>)
df.write().format("delta").save(<path_to_Delta_table>)
val deltaTable = io.delta.tables.DeltaTable.forPath(spark, <path_to_Delta_table>)
deltaTable.vacuum()
Python
spark.read.format("delta").option("versionAsOf", 1).load(<path_to_Delta_table>)
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, <path_to_Delta_table>)
deltaTable.vacuum()
deltaTable.history()
SQL
spark.sql("CONVERT TO DELTA parquet.`" + <path_to_Parquet_table> + "`");
spark.sql("DESCRIBE HISTORY delta.`" + <path_to_Delta_table> + "`");

Exemples

Voici des exemples de code pour vous aider à commencer à utiliser Delta Lake avec Data Flow.

Les exemples sont disponibles à partir des échantillons Oracle sur GitHub.