ストリーミングについて

Apache Spark Structured Streaming機能を使用して、Oracle AI Data Platform Workbenchでストリーミング・データまたは継続的に生成されたデータをほぼリアルタイムで処理できます。

ノートブックとワークフローの両方で、Apache Spark構造化ストリーミングがサポートされています。次のソースおよびシンクは、ストリーム・データの読取り、ストリーム・データの書込みおよびチェックポイントの場所に使用できます。

表16-1サポートされているソースおよびシンク

ソースまたはシンク サポート済?
ボリューム・パス(/Volume/bronze/bucket1) すべての形式でサポートされます
ワークスペース・パス(/Workspace/folder1/) すべての形式でサポートされます
3つの部分名を持つカタログ内の表(catalog.schema.table) Delta形式のみサポート

Parquet、CSV、JSON、ORC形式ではサポートされていません

例1: サポートされているコード

  • streaming_df = spark.readStream.format("delta").table('stdcatalog.stdschema.deltatable')
  • streaming_df.writeStream.format("delta").outputMode("append").option("checkpointLocation", "/Volumes/checkpoints1/").toTable("stdcatalog.stdschema.deltatable")

例2: サポートされていないコード

  • spark.readStream.option("withEventTimeOrder", "true").format("format") .table("stdcatalog.stdschema.samplecsv")
Kafka 3つの部分からなる命名規則のない任意のKafka互換ストリームでサポートされます。

3つの部分からなる命名規則に従い、Kafkaベースのカタログではサポートされていません)

OCIストリーミングサービス サポートされています
OCIオブジェクト・ストレージ・パス(OCI://を使用) 非サポート
Oracle Autonomous AI LakehouseOracle AI DatabaseOracle Autonomous AI Transaction Processing ストリーミング(readStreamまたはwriteStream)ではサポートされていません

ノートブックを使用した構造化ストリーミング

Pythonコードを記述して、ノートブック内のストリーム・データを処理できます。ボリューム・パスまたはワークスペース・パスのいずれかがチェックポイントの場所として有効ですが、オブジェクト・ストレージ・パス(oci://形式)はチェックポイントの場所としてサポートされていません。チェックポイントの場所としてボリューム・パスを使用することをお薦めします。


AI Data Platform Workbenchノートブック・セルでのストリーミング・コードの例


AI Data Platform Workbenchノートブックのストリーム・データの処理に使用されるPythonコードの例

ストリーミング・コードの実行中に、ノートブックの「ダッシュボード」タブから、入力レート、処理レート、バッチ期間などのApache Sparkストリーミング関連のイベントを表示できます。


ノートブックの「ダッシュボード」タブが開き、ストリーミング・データが表示されます

コードを段階的に開発しながら、「RAWデータ」タブからRAWストリーミング関連のイベントを表示することもできます。


ストリーミング関連のイベントを表示するノートブックで「Raw Data」タブが開きます