Flux de données et Delta Lake

Le service de flux de données prend en charge Delta Lake par défaut lorsque vos applications exécutent Spark 3.2.1 ou version ultérieure.

Delta Lake vous permet de créer une architecture d'entrepôt avec lac de données au-dessus des lacs de données. Delta Lake fournit des transactions ACID, un traitement de métadonnées évolutif et il unifie le traitement de données en continu et par lots au-dessus des lacs de données existants. Delta Lake 3.1.0 est pris en charge avec le moteur de traitement Spark 3.5.0 du service de flux de données, Delta Lake 2.0.1 et 1.2.1 sont pris en charge avec le moteur de traitement Spark 3.2.1 du service de flux de données.

Pour utiliser Delta Lake avec le service de flux de données :
  • La version de Spark dans le service de flux de données doit être 3.2.1 (ou une version ultérieure).
  • Utilisez le format delta.
Pour plus d'informations sur Delta Lake et son utilisation, consultez les notes de version de Delta Lake et la documentation sur Delta Lake.

Charger le lac Delta

Suivez ces étapes pour charger Delta Lake à utiliser avec le service de flux de données.

Utilisez la propriété de configuration Spark, spark.oracle.deltalake.version, pour spécifier la version de Delta Lake à utiliser. Réglez-le à l'une des valeurs suivantes :
Valeurs Spark.oracle.deltalake.version
Version de Spark Valeur de spark.oracle.deltalake.version Fichiers binaires chargés
3.5 3.1.0 Delta Lake 3.1.0
3.2.1 2.0.1 Delta Lake 2.0.1
3.2.1 1.2.1 Delta Lake 1.2.1
3.5 3.2 none Aucun fichier binaire Delta Lake n'est chargé, vous devez les fournir.
Note

Si vous ne définissez pas de valeur pour spark.oracle.deltalake.version, les fichiers binaires Delta Lake 1.2.1 sont chargés par défaut.

Si vous réglez spark.oracle.deltalake.version à none, vous devez fournir les bibliothèques de dépendances Delta Lake dans le fichier JAR de l'application. Pour plus d'informations, reportez-vous à la documentation publique sur Delta Lake.

Par exemple, pour charger Delta Lake 3.1.0, regroupez les bibliothèques suivantes :
  • delta-storage-3.1.0.jar
  • delta-spark_2.12-3.1.0.jar
  • delta-contributions_2.12-3.1.0.jar
et procédez comme suit :
  1. Pour les applications Java ou Scala, fournissez la dépendance Delta Lake 3.1.0 à partir du référentiel maven :
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-spark_2.12</artifactId>
        <version>3.1.0</version>
    </dependency>
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-contribs_2.12</artifactId>
        <version>3.1.0</version>
    </dependency>
    Ou pour les applications Python, packagez la bibliothèque Delta Lake et fournissez-la à l'application.
  2. Définissez la configuration Spark pour activer Delta Lake :
    spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore
    spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension
    spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
Par exemple, pour charger Delta Lake 2.0.1, regroupez les bibliothèques suivantes :
  • delta-core_2.12-2.0.1.jar
  • delta-contributions_2.12-2.0.1.jar
  • delta-storage-2.0.1.jar
et procédez comme suit :
  1. Pour les applications Java ou Scala, fournissez la dépendance Delta Lake 2.0.1 à partir du référentiel maven :
    <dependency>
      <groupId>io.delta</groupId>
      <artifactId>delta-core_2.12</artifactId>
      <version>2.0.1</version>
    </dependency> 
    <dependency>
      <groupId>io.delta</groupId>
      <artifactId>delta-contribs_2.12</artifactId>
      <version>2.0.1</version>
      </dependency>
    Ou pour les applications Python, packagez la bibliothèque Delta Lake et fournissez-la à l'application.
  2. Définissez la configuration Spark pour activer Delta Lake :
    spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore
    spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension
    spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
Note

Vous pouvez également activer Delta Lake dans les options avancées fournies dans la console, lors de la création d'une application ou de l'exécution d'une application.

Exemple d'utilisation de l'API Delta Lake

Exemples d'utilisation de l'API Delta Lake avec le service de flux de données.

Le moteur Spark du service de flux de données prend en charge le format delta par défaut. Les API Delta Lake sont disponibles pour les langages Java, Python et Scala. Si vous utilisez des API Python Delta Lake, utilisez l'encapsuleur de dépendances archive.zip personnalisé, incluez l'ensemble delta-spark, comme décrit dans Fonctionnalité Spark-Submit dans le service de flux de données.

Exemples d'utilisation

Java ou Scala
spark.read().format("delta").load(<path_to_Delta_table>)
df.write().format("delta").save(<path_to_Delta_table>)
val deltaTable = io.delta.tables.DeltaTable.forPath(spark, <path_to_Delta_table>)
deltaTable.vacuum()
Python
spark.read.format("delta").option("versionAsOf", 1).load(<path_to_Delta_table>)
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, <path_to_Delta_table>)
deltaTable.vacuum()
deltaTable.history()
SQL
spark.sql("CONVERT TO DELTA parquet.`" + <path_to_Parquet_table> + "`");
spark.sql("DESCRIBE HISTORY delta.`" + <path_to_Delta_table> + "`");

Exemples

Voici quelques exemples de code pour vous aider à commencer à utiliser Delta Lake avec le service de flux de données

Des exemples sont disponibles dans Exemples Oracle sur GitHub.