Flux de données et Delta Lake
Le service de flux de données prend en charge Delta Lake par défaut lorsque vos applications exécutent Spark 3.2.1 ou version ultérieure.
Delta Lake vous permet de créer une architecture d'entrepôt avec lac de données au-dessus des lacs de données. Delta Lake fournit des transactions ACID, un traitement de métadonnées évolutif et il unifie le traitement de données en continu et par lots au-dessus des lacs de données existants. Delta Lake 3.1.0 est pris en charge avec le moteur de traitement Spark 3.5.0 du service de flux de données, Delta Lake 2.0.1 et 1.2.1 sont pris en charge avec le moteur de traitement Spark 3.2.1 du service de flux de données.
- La version de Spark dans le service de flux de données doit être 3.2.1 (ou une version ultérieure).
- Utilisez le format delta.
Charger le lac Delta
Suivez ces étapes pour charger Delta Lake à utiliser avec le service de flux de données.
spark.oracle.deltalake.version
, pour spécifier la version de Delta Lake à utiliser. Réglez-le à l'une des valeurs suivantes :Version de Spark | Valeur de spark.oracle.deltalake.version | Fichiers binaires chargés |
---|---|---|
3.5 | 3.1.0 |
Delta Lake 3.1.0 |
3.2.1 | 2.0.1 |
Delta Lake 2.0.1 |
3.2.1 | 1.2.1 |
Delta Lake 1.2.1 |
3.5 3.2 | none |
Aucun fichier binaire Delta Lake n'est chargé, vous devez les fournir. |
Si vous ne définissez pas de valeur pour
spark.oracle.deltalake.version
, les fichiers binaires Delta Lake 1.2.1 sont chargés par défaut.Si vous réglez spark.oracle.deltalake.version
à none
, vous devez fournir les bibliothèques de dépendances Delta Lake dans le fichier JAR de l'application. Pour plus d'informations, reportez-vous à la documentation publique sur Delta Lake.
- delta-storage-3.1.0.jar
- delta-spark_2.12-3.1.0.jar
- delta-contributions_2.12-3.1.0.jar
- Pour les applications Java ou Scala, fournissez la dépendance Delta Lake 3.1.0 à partir du référentiel maven :Ou pour les applications Python, packagez la bibliothèque Delta Lake et fournissez-la à l'application.
<dependency> <groupId>io.delta</groupId> <artifactId>delta-spark_2.12</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>io.delta</groupId> <artifactId>delta-contribs_2.12</artifactId> <version>3.1.0</version> </dependency>
- Définissez la configuration Spark pour activer Delta Lake :
spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
- delta-core_2.12-2.0.1.jar
- delta-contributions_2.12-2.0.1.jar
- delta-storage-2.0.1.jar
- Pour les applications Java ou Scala, fournissez la dépendance Delta Lake 2.0.1 à partir du référentiel maven :Ou pour les applications Python, packagez la bibliothèque Delta Lake et fournissez-la à l'application.
<dependency> <groupId>io.delta</groupId> <artifactId>delta-core_2.12</artifactId> <version>2.0.1</version> </dependency> <dependency> <groupId>io.delta</groupId> <artifactId>delta-contribs_2.12</artifactId> <version>2.0.1</version> </dependency>
- Définissez la configuration Spark pour activer Delta Lake :
spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
Vous pouvez également activer Delta Lake dans les options avancées fournies dans la console, lors de la création d'une application ou de l'exécution d'une application.
Exemple d'utilisation de l'API Delta Lake
Exemples d'utilisation de l'API Delta Lake avec le service de flux de données.
Le moteur Spark du service de flux de données prend en charge le format delta
par défaut. Les API Delta Lake sont disponibles pour les langages Java, Python et Scala. Si vous utilisez des API Python Delta Lake, utilisez l'encapsuleur de dépendances archive.zip personnalisé, incluez l'ensemble delta-spark, comme décrit dans Fonctionnalité Spark-Submit dans le service de flux de données.
Exemples d'utilisation
- Java ou Scala
-
spark.read().format("delta").load(<path_to_Delta_table>) df.write().format("delta").save(<path_to_Delta_table>) val deltaTable = io.delta.tables.DeltaTable.forPath(spark, <path_to_Delta_table>) deltaTable.vacuum()
- Python
-
spark.read.format("delta").option("versionAsOf", 1).load(<path_to_Delta_table>) from delta.tables import * deltaTable = DeltaTable.forPath(spark, <path_to_Delta_table>) deltaTable.vacuum() deltaTable.history()
- SQL
spark.sql("CONVERT TO DELTA parquet.`" + <path_to_Parquet_table> + "`"); spark.sql("DESCRIBE HISTORY delta.`" + <path_to_Delta_table> + "`");
Exemples
Voici quelques exemples de code pour vous aider à commencer à utiliser Delta Lake avec le service de flux de données
Des exemples sont disponibles dans Exemples Oracle sur GitHub.