Flusso di dati e Delta Lake
Il flusso di dati supporta Delta Lake per impostazione predefinita quando le applicazioni eseguono Spark 3.2.1 o versioni successive.
Delta Lake ti consente di creare un'architettura di Lakehouse sui data lake. Delta Lake fornisce transazioni ACID, gestione scalabile dei metadati e unifica l'elaborazione dei dati in streaming e in batch sui data lake esistenti. Delta Lake 3.1.0 è supportato con il motore di elaborazione Data Flow Spark 3.5.0, Delta Lake 2.0.1 e 1.2.1 sono supportati con il motore di elaborazione Data Flow Spark 3.2.1.
- La versione di Spark in Data Flow deve essere 3.2.1 (o successiva).
- Utilizzare il formato delta.
Carica Delta Lake
Attenersi alla procedura riportata di seguito per caricare Delta Lake da utilizzare con Data Flow.
spark.oracle.deltalake.version
, per specificare la versione di Delta Lake da utilizzare. Impostarlo su uno dei seguenti valori:Versione Spark | Valore di spark.oracle.deltalake.version | File binari caricati |
---|---|---|
3,5 | 3.1.0 |
Delta Lake 3.1.0 |
3,2 | 2.0.1 |
Delta Lake 2.0.1 |
3,2 | 1.2.1 |
Delta Lake 1.2.1 |
3,5 3,2 | none |
Non sono stati caricati file binari Delta Lake. È necessario fornirli. |
Se non si imposta un valore per
spark.oracle.deltalake.version
, i file binari Delta Lake 1.2.1 vengono caricati per impostazione predefinita.Se si imposta spark.oracle.deltalake.version
su none
, è necessario fornire le librerie di dipendenze Delta Lake come parte del file JAR dell'applicazione. Per ulteriori informazioni, consulta la documentazione pubblica di Delta Lake.
- delta-storage-3.1.0.jar
- delta-spark_2.12-3.1.0.jar
- contributi delta_2.12-3.1.0.jar
- Per le applicazioni Java o Scala, fornire la dipendenza Delta Lake 3.1.0 dal repository maven:In alternativa, per le applicazioni Python, creare un pacchetto della libreria Delta Lake e fornirla all'applicazione.
<dependency> <groupId>io.delta</groupId> <artifactId>delta-spark_2.12</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>io.delta</groupId> <artifactId>delta-contribs_2.12</artifactId> <version>3.1.0</version> </dependency>
- Impostare la configurazione Spark per abilitare Delta Lake:
spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
- delta-core_2.12-2.0.1.jar
- contributi_2.12-2.0.1.jar delta
- storage delta-2.0.1.jar
- Per le applicazioni Java o Scala, fornire la dipendenza Delta Lake 2.0.1 dal repository maven:In alternativa, per le applicazioni Python, creare un pacchetto della libreria Delta Lake e fornirla all'applicazione.
<dependency> <groupId>io.delta</groupId> <artifactId>delta-core_2.12</artifactId> <version>2.0.1</version> </dependency> <dependency> <groupId>io.delta</groupId> <artifactId>delta-contribs_2.12</artifactId> <version>2.0.1</version> </dependency>
- Impostare la configurazione Spark per abilitare Delta Lake:
spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
È inoltre possibile abilitare Delta Lake nelle opzioni avanzate fornite nella console durante la creazione di un'applicazione o l'esecuzione di un'applicazione.
Esempio di utilizzo dell'API Delta Lake
Esempi di utilizzo dell'API Delta Lake con Data Flow.
Il modulo di gestione Spark di Flusso dati supporta il formato delta
per impostazione predefinita. Le API Delta Lake sono disponibili per i linguaggi Java, Python e Scala. Se si utilizzano le API Python di Delta Lake, utilizzare il pacchetto di dipendenze archive.zip personalizzato, includere il pacchetto delta-spark, come descritto in Funzionalità Spark-Submit in Data Flow.
Esempi di utilizzo
- Java o Scala
-
spark.read().format("delta").load(<path_to_Delta_table>) df.write().format("delta").save(<path_to_Delta_table>) val deltaTable = io.delta.tables.DeltaTable.forPath(spark, <path_to_Delta_table>) deltaTable.vacuum()
- Python
-
spark.read.format("delta").option("versionAsOf", 1).load(<path_to_Delta_table>) from delta.tables import * deltaTable = DeltaTable.forPath(spark, <path_to_Delta_table>) deltaTable.vacuum() deltaTable.history()
- SQL
spark.sql("CONVERT TO DELTA parquet.`" + <path_to_Parquet_table> + "`"); spark.sql("DESCRIBE HISTORY delta.`" + <path_to_Delta_table> + "`");
Esempi
Di seguito sono riportati alcuni esempi di codice utili per iniziare a utilizzare Delta Lake con Data Flow
Esempi sono disponibili negli campioni Oracle sul sito GitHub.