关于流处理

您可以使用 Apache Spark 结构化流处理功能,在 Oracle AI Data Platform Workbench 中近乎实时地处理流数据或持续生成的数据。

笔记本和工作流都支持 Apache Spark 结构化流处理。可以使用以下源和接收器从检查点位置读取流数据,将流数据写入到检查点位置。

表 16-1 支持的源和汇

源或接收器 支持?
卷路径 (/Volume/bronze/bucket1) 支持所有格式
工作区路径 (/Workspace/folder1/) 支持所有格式
目录中具有三个部分名称的表 (catalog.schema.table) 仅支持 Delta 格式

Parquet、CSV、JSON、ORC 格式不支持

示例 1:支持的代码

  • streaming_df = spark.readStream.format("delta").table('stdcatalog.stdschema.deltatable')
  • streaming_df.writeStream.format("delta").outputMode("append").option("checkpointLocation", "/Volumes/checkpoints1/").toTable("stdcatalog.stdschema.deltatable")

示例 2:不支持的代码

  • spark.readStream.option("withEventTimeOrder", "true").format("format") .table("stdcatalog.stdschema.samplecsv")
Kafka 支持任何 Kafka 兼容流,不含三部分命名惯例

根据三部分命名约定,不支持基于 Kafka 的目录 )

OCI Streaming 服务 受支持
OCI 对象存储路径(使用 OCI://) 不支持
Oracle Autonomous AI LakehouseOracle AI DatabaseOracle Autonomous AI Transaction Processing 不支持流处理(readStream 或 writeStream)

使用记事本的结构化流处理

您可以编写 Python 代码来处理笔记本中的流数据。卷路径或工作区路径作为检查点位置有效,但对象存储路径(oci:// 格式)不支持作为检查点位置。我们建议将卷路径用作检查点位置。


AI Data Platform Workbench 记事本单元格中的流代码示例


用于在 AI Data Platform Workbench 记事本中处理流数据的 Python 代码示例

在运行流代码时,您可以从记事本中的仪表盘选项卡中查看与 Apache Spark 流相关的事件,例如输入速率、处理速率和批处理持续时间。


记事本中用于显示流数据的仪表盘选项卡

您还可以在逐步开发代码时从原始数据选项卡中查看与原始流相关的事件。


在显示流相关事件的记事本中打开“原始数据”选项卡