Serviços Data Flow e Delta Lake
O serviço Data Flow suporta o Delta Lake por padrão quando seus Aplicativos executam o Spark 3.2.1 ou mais recente.
O Delta Lake permite que você crie uma arquitetura Lakehouse com base em data lakes. O Delta Lake fornece transações ACID, tratamento escalável de metadados e unifica o streaming e o processamento de dados em batch sobre os data lakes existentes. O Delta Lake 3.1.0 é suportado com o mecanismo de processamento do Data Flow Spark 3.5.0, o Delta Lake 2.0.1 e o 1.2.1 são suportados com o mecanismo de processamento do Data Flow Spark 3.2.1.
- A versão do Spark no serviço Data Flow deve ser 3.2.1 (ou posterior).
- Use o formato delta.
Carregar Delta Lake
Siga estas etapas para carregar o Delta Lake a ser usado com o serviço Data Flow.
spark.oracle.deltalake.version
, para especificar qual versão do Delta Lake usar. Defina-o como um dos seguintes valores:Versão do Spark | Valor de spark.oracle.deltalake.version | Binários carregados |
---|---|---|
3.5 | 3.1.0 |
Delta Lake 3.1.0 |
3.2.1 | 2.0.1 |
Delta Lake 2.0.1 |
3.2.1 | 1.2.1 |
Delta Lake 1.2.1 |
3.5 3.2 | none |
Não há binários Delta Lake carregados, você deve fornecê-los. |
Se você não definir um valor para
spark.oracle.deltalake.version
, os binários do Delta Lake 1.2.1 serão carregados por padrão.Se você definir spark.oracle.deltalake.version
como none
, deverá fornecer as bibliotecas de dependência do Delta Lake como parte do JAR do aplicativo. Mais informações estão disponíveis na documentação pública do Delta Lake.
- delta-armazenamento-3.1.0.jar
- delta-spark_2.12-3.1.0.jar
- delta-contribs_2.12-3.1.0.jar
- Para aplicativos Java ou Scala, forneça a dependência do Delta Lake 3.1.0 do repositório maven:Ou para aplicativos Python, empacote a biblioteca Delta Lake e forneça-a ao aplicativo.
<dependency> <groupId>io.delta</groupId> <artifactId>delta-spark_2.12</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>io.delta</groupId> <artifactId>delta-contribs_2.12</artifactId> <version>3.1.0</version> </dependency>
- Defina a configuração do Spark para ativar o Delta Lake:
spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
- delta-núcleo_2.12-2.0.1.jar
- delta-contribs_2.12-2.0.1.jar
- delta-armazenamento-2.0.1.jar
- Para aplicativos Java ou Scala, forneça a dependência do Delta Lake 2.0.1 do repositório maven:Ou para aplicativos Python, empacote a biblioteca Delta Lake e forneça-a ao aplicativo.
<dependency> <groupId>io.delta</groupId> <artifactId>delta-core_2.12</artifactId> <version>2.0.1</version> </dependency> <dependency> <groupId>io.delta</groupId> <artifactId>delta-contribs_2.12</artifactId> <version>2.0.1</version> </dependency>
- Defina a configuração do Spark para ativar o Delta Lake:
spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
Você também pode ativar o Delta Lake nas opções avançadas fornecidas na Console, ao criar um aplicativo ou executar um aplicativo.
Exemplo de Uso de API do Delta Lake
Exemplos do uso da API do Delta Lake com o serviço Data Flow.
O mecanismo Spark do serviço Data Flow suporta o formato delta
por padrão. As APIs do Delta Lake estão disponíveis para linguagens Java, Python e Scala. Se você estiver usando APIs Python do Delta Lake, use o empacotador de dependência archive.zip personalizado, inclua o pacote delta-spark, conforme descrito em Funcionalidade Spark-Submit no Serviço Data Flow.
Amostras de Uso
- Java ou Scala
-
spark.read().format("delta").load(<path_to_Delta_table>) df.write().format("delta").save(<path_to_Delta_table>) val deltaTable = io.delta.tables.DeltaTable.forPath(spark, <path_to_Delta_table>) deltaTable.vacuum()
- Python
-
spark.read.format("delta").option("versionAsOf", 1).load(<path_to_Delta_table>) from delta.tables import * deltaTable = DeltaTable.forPath(spark, <path_to_Delta_table>) deltaTable.vacuum() deltaTable.history()
- SQL
spark.sql("CONVERT TO DELTA parquet.`" + <path_to_Parquet_table> + "`"); spark.sql("DESCRIBE HISTORY delta.`" + <path_to_Delta_table> + "`");
Exemplos
Aqui estão alguns exemplos de código para ajudá-lo a usar o Delta Lake com o serviço Data Flow
Há exemplos disponíveis nas amostras da Oracle em GitHub.