Flusso di dati e Delta Lake

Il flusso di dati supporta Delta Lake per impostazione predefinita quando le applicazioni eseguono Spark 3.2.1 o versioni successive.

Delta Lake ti consente di creare un'architettura di Lakehouse sui data lake. Delta Lake fornisce transazioni ACID, gestione scalabile dei metadati e unifica l'elaborazione dei dati in streaming e in batch sui data lake esistenti. Delta Lake 3.1.0 è supportato con il motore di elaborazione Data Flow Spark 3.5.0, Delta Lake 2.0.1 e 1.2.1 sono supportati con il motore di elaborazione Data Flow Spark 3.2.1.

Per utilizzare Delta Lake con Data Flow:
  • La versione di Spark in Data Flow deve essere 3.2.1 (o successiva).
  • Utilizzare il formato delta.
Per ulteriori informazioni su Delta Lake e sul suo utilizzo, consulta le note di rilascio di Delta Lake e la documentazione di Delta Lake.

Carica Delta Lake

Attenersi alla procedura riportata di seguito per caricare Delta Lake da utilizzare con Data Flow.

Utilizzare la proprietà di configurazione Spark, spark.oracle.deltalake.version, per specificare la versione di Delta Lake da utilizzare. Impostarlo su uno dei seguenti valori:
Valori Spark.oracle.deltalake.version
Versione Spark Valore di spark.oracle.deltalake.version File binari caricati
3,5 3.1.0 Delta Lake 3.1.0
3,2 2.0.1 Delta Lake 2.0.1
3,2 1.2.1 Delta Lake 1.2.1
3,5 3,2 none Non sono stati caricati file binari Delta Lake. È necessario fornirli.
Nota

Se non si imposta un valore per spark.oracle.deltalake.version, i file binari Delta Lake 1.2.1 vengono caricati per impostazione predefinita.

Se si imposta spark.oracle.deltalake.version su none, è necessario fornire le librerie di dipendenze Delta Lake come parte del file JAR dell'applicazione. Per ulteriori informazioni, consulta la documentazione pubblica di Delta Lake.

Ad esempio, per caricare Delta Lake 3.1.0, creare il package di queste librerie:
  • delta-storage-3.1.0.jar
  • delta-spark_2.12-3.1.0.jar
  • contributi delta_2.12-3.1.0.jar
e procedere come segue:
  1. Per le applicazioni Java o Scala, fornire la dipendenza Delta Lake 3.1.0 dal repository maven:
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-spark_2.12</artifactId>
        <version>3.1.0</version>
    </dependency>
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-contribs_2.12</artifactId>
        <version>3.1.0</version>
    </dependency>
    In alternativa, per le applicazioni Python, creare un pacchetto della libreria Delta Lake e fornirla all'applicazione.
  2. Impostare la configurazione Spark per abilitare Delta Lake:
    spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore
    spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension
    spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
Ad esempio, per caricare Delta Lake 2.0.1, creare il package di queste librerie:
  • delta-core_2.12-2.0.1.jar
  • contributi_2.12-2.0.1.jar delta
  • storage delta-2.0.1.jar
e procedere come segue:
  1. Per le applicazioni Java o Scala, fornire la dipendenza Delta Lake 2.0.1 dal repository maven:
    <dependency>
      <groupId>io.delta</groupId>
      <artifactId>delta-core_2.12</artifactId>
      <version>2.0.1</version>
    </dependency> 
    <dependency>
      <groupId>io.delta</groupId>
      <artifactId>delta-contribs_2.12</artifactId>
      <version>2.0.1</version>
      </dependency>
    In alternativa, per le applicazioni Python, creare un pacchetto della libreria Delta Lake e fornirla all'applicazione.
  2. Impostare la configurazione Spark per abilitare Delta Lake:
    spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore
    spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension
    spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
Nota

È inoltre possibile abilitare Delta Lake nelle opzioni avanzate fornite nella console durante la creazione di un'applicazione o l'esecuzione di un'applicazione.

Esempio di utilizzo dell'API Delta Lake

Esempi di utilizzo dell'API Delta Lake con Data Flow.

Il modulo di gestione Spark di Flusso dati supporta il formato delta per impostazione predefinita. Le API Delta Lake sono disponibili per i linguaggi Java, Python e Scala. Se si utilizzano le API Python di Delta Lake, utilizzare il pacchetto di dipendenze archive.zip personalizzato, includere il pacchetto delta-spark, come descritto in Funzionalità Spark-Submit in Data Flow.

Esempi di utilizzo

Java o Scala
spark.read().format("delta").load(<path_to_Delta_table>)
df.write().format("delta").save(<path_to_Delta_table>)
val deltaTable = io.delta.tables.DeltaTable.forPath(spark, <path_to_Delta_table>)
deltaTable.vacuum()
Python
spark.read.format("delta").option("versionAsOf", 1).load(<path_to_Delta_table>)
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, <path_to_Delta_table>)
deltaTable.vacuum()
deltaTable.history()
SQL
spark.sql("CONVERT TO DELTA parquet.`" + <path_to_Parquet_table> + "`");
spark.sql("DESCRIBE HISTORY delta.`" + <path_to_Delta_table> + "`");

Esempi

Di seguito sono riportati alcuni esempi di codice utili per iniziare a utilizzare Delta Lake con Data Flow

Esempi sono disponibili negli campioni Oracle sul sito GitHub.