实施 Oracle Modern Data Platform for Business Reporting and Forecasting

本节介绍如何供应 OCI 服务、实施用例、可视化数据以及创建预测。

预配 OCI 服务

下面我们将介绍为简单演示此用例预配资源的主要步骤。

  1. 创建 OCI 对象存储存储桶。
    有关详细信息,请参阅创建存储桶
  2. 预配 Oracle Big Data ServiceAutonomous Data Warehouse

实施 Oracle Modern Data Platform 用例

在介绍实施细节之前,让我们看看整体流程。

  1. 原始数据将驻留在 OCI 对象存储中。
  2. Oracle Big Data Service 上原生运行的 Spark 将从 OCI 对象存储读取数据。它将清理、转换和匿名化数据,最终将数据保留到 Autonomous Data Warehouse
  3. 用户将利用 Oracle Analytics Cloud 来可视化数据和进行预测。


oci-modern-data-reporting-flow-oracle.zip

注意:

所选的数据流是一个简单的例子,实际的业务用例可能会更多地涉及。

以下部分介绍了如何上载示例数据、配置 OCI Object StorageOracle Big Data Service 以及调用简单的 Spark scala 作业来转换数据以及将数据移动到 Autonomous Data Warehouse

  1. 获取示例数据并上载到您创建的 OCI 对象存储存储桶。此解决方案中用于演示实施的数据是从 Kaggle 下载的时间序列销售数据。此数据集中的每条记录都有以下字段:
    1. 订单标识
    2. 产品名称
    3. Quantity Ordered
    4. 销售价格
    5. 订单日期
    6. 采购地址
    有关示例数据的更多信息,请参阅浏览更多部分中的 Kaggle:示例销售数据链接。
  2. 配置 OCI 对象存储访问:Oracle Big Data Service 集群需要访问对象存储存储桶才能创建外部 Hive 表。
    有关实施详细信息,请按照本文档中的步骤操作:使用对象存储 API 密钥
  3. 登录 Oracle Big Data Service 并调用 Hive。基于包含示例销售数据的对象存储桶创建外部表。
    create external table if not exists sales_obj_store
    (Order_ID bigint,
    Product string,
    Quantity_Ordered int,
    PriceEach double,
    Order_Date string,
    Purchase_Address string)
    row format delimited
    fields terminated by ','
    stored as TEXTFILE
    location 'oci://bucket_namej@namespace/sales_data/'
  4. 确认可以使用上面的外部表查询数据。
     select * from sales_obj_store limit 20;
  5. 配置 Oracle Big Data ServiceAutonomous Data Warehouse 的访问权限:下载 Autonomous Data Warehouse wallet 文件,导航到 Autonomous Data Warehouse 集群,单击数据库连接并下载 wallet 文件。将 wallet 存储在 Oracle Big Data Service 集群上的目录中。
  6. 下载 Oracle Big Data Service 集群上的 Oracle JDBC 驱动程序和相关 jar 文件。可以从以下链接下载驱动程序:Oracle Database JDBC 驱动程序和 Companion Jars 下载。调用 Spark 作业时传入这些 jar。
  7. 调用简单的 Spark scala 作业来转换数据并将数据移动到 Autonomous Data Warehouse
    1. 此作业执行简单转换,将字符串字段转换为时间戳字段。
    2. 此作业还通过删除完整的客户地址并从中提取 zip 字段来匿名化数据。
    import com.oracle.bmc.auth.AbstractAuthenticationDetailsProvider
    
    import java.util.Properties
    
    import org.apache.spark.sql.Dataset
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.SparkSession
    import oracle.jdbc.OracleConnection
    
    val jdbcUrl = s"jdbc:oracle:thin:@(description=(retry_count=20)(retry_delay=3)(address=(protocol=tcps)(port=1522)(host=adb.us-ashburn-1.oraclecloud.com))(connect_data=(service_name=xxxx_high.adb.oraclecloud.com))(security=(ssl_server_dn_match=yes)))?TNS_ADMIN=/home/xxx/wallet"
    
    val connectionProperties = new Properties()
    var options = collection.mutable.Map[String, String]()
    options += ("driver" -> "oracle.jdbc.driver.OracleDriver")
    options += ("url" -> jdbcUrl)
    options += (OracleConnection.CONNECTION_PROPERTY_USER_NAME -> "xxxx")
    options += (OracleConnection.CONNECTION_PROPERTY_PASSWORD -> "xxxxx")
    options += (OracleConnection.CONNECTION_PROPERTY_TNS_ADMIN -> "/home/xxx/wallet")
    options += ("dbtable" -> "schema_name.sales")
    
    /* read from hive */
    val dfsales = spark.sql("select * from sales")
    
    /* transform */
    val ts = to_timestamp($"order_date", "MM/dd/yy HH:mm")
    val dfsales_orcl = dfsales.withColumn("order_date",ts)
    val dfsales_orcl_zip = dfsales_orcl.select(col("*"), substring_index(col("purchase_address"), " ", -1).as("zip"))
    val dfsales_orcl_transformed = dfsales_orcl_zip.drop("purchase_address")
    
    /* write to ADW */
    dfsales_orcl_transformed.write.format("jdbc").options(options).mode("append").save()
数据转换作业完成后,确认可以在 Autonomous Data Warehouse 中查看数据。