データ・フローとDelta Lake
データ・フローでは、アプリケーションでSpark 3.2.1以降を実行すると、デフォルトでDelta Lakeがサポートされます。
Delta Lakeでは、データ・レイク上にレイクハウス・アーキテクチャを構築できます。Delta Lakeでは、ACIDトランザクションとスケーラブルなメタデータ処理が実現されており、既存のデータ・レイク上にストリーミングとバッチ・データ処理が統合されます。Delta Lake 3.1.0は、データ・フローSpark 3.5.0処理エンジンでサポートされており、データ・フローSpark 3.2.1処理エンジンではDelta Lake 2.0.1および1.2.1がサポートされています。
- データ・フローのSparkバージョンは3.2.1 (以降)である必要があります。
- delta形式を使用します。
Delta Lakeのロード
データ・フローで使用するDelta Lakeをロードするには、次のステップに従います。
spark.oracle.deltalake.version
を使用して、使用するDelta Lakeのバージョンを指定します。これは、次のいずれかの値に設定します。Sparkバージョン | spark.oracle.deltalake.versionの値 | バイナリがロードされました |
---|---|---|
3.5.0 | 3.1.0 |
Delta Lake 3.1.0 |
3.2.1 | 2.0.1 |
Delta Lake 2.0.1 |
3.2.1 | 1.2.1 |
Delta Lake 1.2.1 |
3.5.0, 3.2.1 | none |
Delta Lakeバイナリはロードされていないため、指定する必要があります。 |
spark.oracle.deltalake.version
の値を設定しない場合、デフォルトでDelta Lake 1.2.1バイナリがロードされます。spark.oracle.deltalake.version
をnone
に設定した場合は、アプリケーションJARの一部としてDelta Lake依存性ライブラリを指定する必要があります。詳細は、Delta Lakeのパブリック・ドキュメンテーションを参照してください。
- デルタ・ストレージ-3.1.0.jar
- デルタ・スパーク_2.12-3.1.0.jar
- delta-contribs_2.12-3.1.0.jar
- JavaまたはScalaアプリケーションの場合は、mavenリポジトリからのDelta Lake 3.1.0依存関係を指定します。または、Pythonアプリケーションの場合、Delta Lakeライブラリをパッケージ化し、アプリケーションに提供します。
<dependency> <groupId>io.delta</groupId> <artifactId>delta-spark_2.12</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>io.delta</groupId> <artifactId>delta-contribs_2.12</artifactId> <version>3.1.0</version> </dependency>
- Delta Lakeを有効にするようにSpark構成を設定します:
spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
- デルタコア_2.12-2.0.1.jar
- delta-contribs_2.12-2.0.1.jar
- デルタ・ストレージ-2.0.1.jar
- JavaまたはScalaアプリケーションの場合は、mavenリポジトリからのDelta Lake 2.0.1依存関係を指定します。または、Pythonアプリケーションの場合、Delta Lakeライブラリをパッケージ化し、アプリケーションに提供します。
<dependency> <groupId>io.delta</groupId> <artifactId>delta-core_2.12</artifactId> <version>2.0.1</version> </dependency> <dependency> <groupId>io.delta</groupId> <artifactId>delta-contribs_2.12</artifactId> <version>2.0.1</version> </dependency>
- Delta Lakeを有効にするようにSpark構成を設定します:
spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
Delta Lake APIの使用例
データ・フローでのDelta Lake APIの使用例。
データ・フローSparkエンジンでは、デフォルトでdelta
形式がサポートされています。Delta Lake APIは、Java、PythonおよびScala言語で使用できます。Delta Lake Python APIを使用している場合は、データ・フローのSpark-Submit機能の説明に従って、カスタムのarchive.zip依存関係パッケージャを使用して、デルタ・スパーク・パッケージを含めます。
使用例
- JavaまたはScala
-
spark.read().format("delta").load(<path_to_Delta_table>) df.write().format("delta").save(<path_to_Delta_table>) val deltaTable = io.delta.tables.DeltaTable.forPath(spark, <path_to_Delta_table>) deltaTable.vacuum()
- Python
-
spark.read.format("delta").option("versionAsOf", 1).load(<path_to_Delta_table>) from delta.tables import * deltaTable = DeltaTable.forPath(spark, <path_to_Delta_table>) deltaTable.vacuum() deltaTable.history()
- SQL
spark.sql("CONVERT TO DELTA parquet.`" + <path_to_Parquet_table> + "`"); spark.sql("DESCRIBE HISTORY delta.`" + <path_to_Delta_table> + "`");
例
ここでは、データ・フローでDelta Lakeの使用を開始する上で役立つコード例を示します
GitHubのoracle-samplesで、例を参照できます。