關於串流處理

您可以在 Oracle AI Data Platform Workbench 中使用 Apache Spark Structured Streaming 功能,以近乎即時的方式處理串流資料或持續產生資料。

筆記型電腦和工作流程都支援 Apache Spark 結構化串流處理。您可以使用下列來源和接收器來讀取串流資料、將串流資料寫入檢查點位置和檢查點位置。

表格 16-1 支援的來源與接收器

來源或槽 Stencils 支援服務?
磁碟區路徑 (/Volume/bronze/bucket1) 支援所有格式
工作區路徑 (/Workspace/folder1/) 支援所有格式
目錄中含有三個零件名稱的表格 (catalog.schema.table) 僅支援差異格式

Parquet、CSV、JSON、ORC 格式不支援

範例 1:支援的程式碼

  • streaming_df = spark.readStream.format("delta").table('stdcatalog.stdschema.deltatable')
  • streaming_df.writeStream.format("delta").outputMode("append").option("checkpointLocation", "/Volumes/checkpoints1/").toTable("stdcatalog.stdschema.deltatable")

範例 2:不支援的程式碼

  • spark.readStream.option("withEventTimeOrder", "true").format("format") .table("stdcatalog.stdschema.samplecsv")
Kafka 支援任何不具三個部分命名慣例的 Kafka 相容串流

遵循三部分命名慣例,Kafka 型錄不支援

OCI Streaming 服務 支援的
OCI 物件儲存路徑 (使用 OCI://) 不支援
Oracle Autonomous AI LakehouseOracle AI DatabaseOracle Autonomous AI Transaction Processing 不支援串流處理 (readStream 或 writeStream)

使用記事本的結構化串流

您可以撰寫 Python 程式碼來處理記事本中的串流資料。磁碟區路徑或工作區路徑有效作為檢查點位置,但不支援物件儲存路徑 (oci:// 格式) 作為檢查點位置。建議您使用磁碟區路徑作為檢查點位置。


AI Data Platform Workbench 記事本儲存格中的串流程式碼範例


在 AI Data Platform Workbench 筆記型電腦中用來處理串流資料的 Python 程式碼範例

您可以在執行串流程式碼時,從筆記型電腦的儀表板頁籤查看 Apache Spark 串流處理相關事件,例如輸入速率、處理速率以及批次持續時間。


已開啟記事本中的「儀表板」頁籤,以顯示串流資料

您也可以在增量開發程式碼時,從原始資料頁籤檢視原始串流相關事件。


「原始資料」頁籤會在顯示串流相關事件的記事本中開啟