設定資料流程來處理您的內部部署日誌在 Oracle Cloud Infrastructure 中

簡介

本教學課程將佈建雲端基礎架構,然後部署 PySpark 程式碼,以便在雲端自動上傳、處理及儲存企業內部部署日誌資料。資料會在物件儲存中暫存、在資料流程中處理,然後儲存在 Autonomous Data Warehouse 進行分析。

下表顯示架構:

architecture-analyze-logs.png 圖解的描述

接著使用專為實現這個目的而設計的 Terraform 命令檔,建立 PySpark App,然後佈建 Oracle Cloud Infrastructure (OCI)。您可以從 Oracle 的 GitHub 儲存區域下載命令檔,並進行少量編輯以更新某些變數值。接著套用 Terraform 命令檔。

目標

當您完成本教學課程時,您將擁有運作系統,以便從物件儲存擷取資料、使用 PySpark App 在資料流程中處理資料,然後將結果儲存在 Autonomous Data Warehouse 中。

此處提供的 PySpark App 說明如何驗證及保護您的工作負載,以及如何準備好插入資料庫。您必須加以自訂,以符合您的資料處理需求。

必要條件

開始此教學課程之前,您必須先在 Oracle Cloud Infrastructure 中完成將資料流程 PySpark App 連線至 Autonomous Database 教學課程。該教學課程告訴您如何建立含有此教學課程所需之 Python 和 Java 程式庫的相依性歸檔。特別是,您需要該教學課程的 archive.zip 檔案。

若要完成此教學課程,您必須具備 Oracle Cloud Infrastructure 上租用戶的存取權,才能夠建立架構圖中顯示的使用者自建物件。

本教學課程將下載建立使用者自建物件 (如架構圖中所述) 的 Terraform 程式碼。此程式碼位於 GitHub 的儲存區域。您可以在本機開發機器上執行 Terraform 程式碼,或者從 Oracle Cloud Infrastructure 中的 Cloud Shell 加以執行。Cloud Shell 已安裝 Terraform 和 Git。

如果您使用本機機器,適用下列先決條件:

ADW 連線詳細資訊

若要連線至 ADW 資料庫,您必須在公事包中作為連線參數的一部分。公事包包含證明資料和其他詳細資訊,讓 Bearer 連線至資料庫。公事包的檔案名稱一般使用 wallet_<database-name>.zip 格式,但名稱可能不同。此公事包提供您資料庫連線,但是您的資料庫權限則是由您的資料庫使用者名稱和密碼決定。

本教學課程隨附的 Terraform 建立稱為日誌的永遠免費 Autonomous Data Warehouse 資料庫。系統會將資料庫的公事包儲存在兩個位置。一個位於物件儲存 (稱為 Wallet) 的儲存桶內,另一個位於執行 Terraform 程式碼之電腦上的目錄中。在這兩種情況下,公事包檔案的名稱是 wallet_logs.zip

Terraform 會在建立資料庫完成時產生 ADMIN 密碼,並在完成後向您顯示。

準備 Oracle Cloud Infrastructure 環境

建立 PySpark App 和執行 Terraform 之前,您必須進行一些 OCI 租用戶的初步組態。

您必須建立區間和您的資料流程 App 專用的使用者群組。這可讓您隔離租用戶中其他使用者的資料流程。

Terraform 可以建立區間和群組,但在此教學課程中,您將自行建立區間和群組。您可以藉此瀏覽及使用 OCI 主控台。

建立區間

建立僅包含與此管線相關之使用者自建物件、組態和其他資源的區間。

  1. 以具有管理員權限的使用者身分登入 Oracle Cloud Infrastructure 主控台。
  2. 在主控台導覽功能表中,選取識別,然後按一下區間
  3. 按一下建立區間
  4. 輸入 Dataflow 作為新區間的名稱,輸入適當的描述,並且確定父項區間是您的根區間。
  5. 按一下建立區間
  6. 建立區間之後,請記錄其 OCID。修改 terraform.tfvars 檔案時,還需要 OCID。

建立群組

您需要建立兩個群組 。一個群組包含可以管理「資料流程」的使用者,而另一個群組則包含可以使用「資料流程」但不允許管理它的使用者。

  1. 建立群組。
    1. 在主控台導覽功能表中,選取識別,然後按一下群組
    2. 按一下建立群組
    3. 輸入 Dataflow_Admin_Group 作為「群組」的名稱,輸入適當的描述,然後按一下建立
  2. 重複上一個步驟來建立另一個名為 Dataflow_User_Group 的群組。
  3. 新增使用者到 Dataflow_Admin_Group。按一下 Dataflow_Admin_Group,然後按一下新增使用者至群組,然後從清單中選取您的使用者。

記錄您的物件儲存命名空間

  1. 按一下使用者設定檔圖示,然後在開啟的功能表中,按一下您的租用戶名稱。
  2. 在開啟的頁面中,找出物件儲存命名空間的值,然後加以複製以供日後使用。

產生 API 金鑰

Terraform 使用 OCI API 在 OCI 中建立及管理基礎架構。若要這麼做,必須要有 API 金鑰組的公開金鑰與指紋。

若要產生自己的公用 / 私密金鑰組:

  1. 按一下使用者設定檔圖示,然後在開啟的功能表中,按一下使用者設定值
  2. 在「使用者詳細資訊」頁面上,查看「資源」區段,然後按一下 API 金鑰
  3. 按一下新增 API 金鑰
  4. 在開啟的「新增 API 金鑰」面板中,按一下下載私密金鑰
  5. 在 [Configuration File Preview] (配置檔案預覽) 中,複製文字方塊的內容,並將它們儲存在方便的位置。您可以在稍後設定 Terraform 時使用此資訊。
  6. 按一下關閉
  7. 重新命名已下載的私密金鑰檔案。使用簡單的名稱,例如 oci_api_key.pem

新增資料庫管理員密碼至保存庫

您必須為此資料庫 ADMIN 密碼建立保存庫加密密碼,並記錄 OCID。您在稍後建立的 PySpark 應用程式中需要這個 PASSWORD_SECRET_OCID 常數。

目前,在保存庫加密密碼中輸入假密碼。這是因為您尚未知道密碼是什麼。您直到 Terraform 完成後才會知道密碼。但您需要 Terraform 上傳至物件儲存之 PySpark App 的加密密碼 OCID。因此,請先建立加密密碼、記錄其 OCID,然後在 Terraform 完成後,再以真實密碼更新加密密碼。

  1. 前往安全性,然後按一下 Vault
  2. 在「清單範圍 (List Scope)」區段中,確定位於資料流程區間。
  3. 按一下建立保存庫
  4. 在開啟的面板中,在名稱欄位中輸入資料流程
  5. 按一下建立保存庫
  6. 當 Vault 狀態變成作用中時,按一下資料流程以開啟「保存庫詳細資訊」頁面。
  7. 在「主要加密」區段中,按一下建立金鑰
  8. 在「建立金鑰」面板中,於名稱欄位中輸入資料流程
  9. 按一下建立金鑰
  10. 在「資源 (Resource)」區段中,按一下安全 (Secrets)
  11. 按一下建立加密密碼
  12. 在「建立加密密碼」對話方塊中,從「建立於區間」清單中選擇區間。
  13. 名稱欄位中,輸入識別加密密碼的名稱。請避免輸入機密資訊。
  14. 描述欄位中,輸入簡短的加密密碼描述以協助識別。請避免輸入任何機密資訊。
  15. 加密金鑰欄位中,選取資料流程
  16. 安全性類型範本欄位中,選取純文字
  17. 加密密碼內容欄位中,輸入假密碼。
  18. 按一下建立加密密碼
  19. 面板關閉後,按一下「資料流程加密密碼」以開啟詳細資訊頁面,然後複製 OCID。

建立 PySpark App

PySpark App 建立一個 Spark 階段作業,該階段作業從物件儲存讀取日誌資料、將其轉換成資料範圍,然後將資料範圍儲存在 ADW 的表格中。

建立 Spark 階段作業之前,您必須確定已匯入某些重要模組,並設定一些常數供稍後使用。

  1. 使用下列陳述式來匯入相關模組。

    import os
    import oci
    import base64
    import zipfile
    from urllib.parse import urlparse
    
    from pyspark import SparkConf
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import regexp_extract
    
  2. 這些是您在 main() 函數中設定的常數。

    確定您為 OBJECT_STORAGE_NAMESPACE 和 PASSWORD_SECRET_OCID 輸入自己的值。

    OBJECT_STORAGE_NAMESPACE = "YOUR-OBJECT-STORAGE-NAMESPACE"
    INPUT_PATH = "oci://data@{}/sample_logs.log".format(OBJECT_STORAGE_NAMESPACE)
    DATABASE_NAME = "logs"
    PASSWORD_SECRET_OCID = "ocid1.vaultsecret..... "
    WALLET_PATH = "oci://Wallet@{}/wallet_{}.zip".format(OBJECT_STORAGE_NAMESPACE,DATABASE_NAME)
    TNS_NAME = "{}_high".format(DATABASE_NAME)
    USER="ADMIN"
    TARGET_DATABASE_TABLE="processed_logs"
    
  3. 設定 Spark 階段作業並從物件儲存載入資料。資料會以純文字檔案的形式,載入至 Spark 資料主機。

    spark_session = SparkSession.builder.appName("Dataflow").getOrCreate()
    input_df = spark_session.read.text(INPUT_PATH)
    input_df.show(5, truncate=False) # Some output for the Data Flow log file
    

    資料範圍看起來應該像下列範例:

    +----------------------------------------------------------------------------------+
    |value                                                                             |
    +----------------------------------------------------------------------------------+
    |10.0.0.1 - user1 [10/Mar/2021:13:55:36 -0700] "GET /index.html HTTP/1.0" 200 2326 |
    |10.0.0.2 - user2 [10/Mar/2021:14:55:36 -0700] "GET /file1.html HTTP/1.0" 200 9889 |
    |10.0.0.3 - user3 [10/Mar/2021:14:55:37 -0700] "GET /file2.html HTTP/1.0" 200 4242 |
    |10.0.0.4 - user4 [10/Mar/2021:14:56:36 -0700] "GET /file3.html HTTP/1.0" 200 10267|
    |10.0.0.1 - user1 [10/Mar/2021:15:05:36 -0700] "GET /file4.html HTTP/1.0" 200 15678|
    +----------------------------------------------------------------------------------+
    

    如您所見,資料是由單一資料欄 value 所組成,每個日誌項目均包含一個單一字串。資料範圍必須具有數個資料欄,一個資料欄才能夠將資料傳輸至資料庫。

  4. 將時間範圍分割為資料欄。

    在下列範例中,user-identifier 欄位 (通常是 "-「) 不會包含在新資料範圍內。

    hostname = r'(^\S+\.[\S+\.]+\S+)\s'
    user = r'\s+.+\s+(.+)\s+\['
    timestamp = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'
    method_uri_protocol = r'\"(\S+)\s(\S+)\s*(\S*)\"'
    status = r'\s(\d{3})\s'
    content_size = r'\s(\d+)$'
    
    logdata_df = input_df.select(
        regexp_extract('value', hostname, 1).alias('hostname'),
        regexp_extract('value', user, 1).alias('user'),
        regexp_extract('value', timestamp, 1).alias('timestamp'),
        regexp_extract('value', method_uri_protocol, 1).alias('method'),
        regexp_extract('value', method_uri_protocol, 2).alias('URI'),
        regexp_extract('value', method_uri_protocol, 3).alias('protocol'),
        regexp_extract('value', status, 1).cast('integer').alias('status'),
        regexp_extract('value', content_size, 1).cast('integer').alias('content_size'))
    

    您也可以套用一或多個機器學習模型的位置。例如,您可能想要在將日誌儲存在資料庫之前,先執行入侵偵測工作負載。

  5. 擷取您 Autonomous Data Warehouse 資料庫的證明資料。

    本教學課程隨附的 Terraform 程式碼會建立自治式資料倉儲,並在物件儲存中儲存公事包檔案。PySpark 應用程式必須擷取公事包並使用其資訊,才能連線至 ADW。

    # Get an Object Store client
    token_key = "spark.hadoop.fs.oci.client.auth.delegationTokenPath"
    token_path = spark_session.sparkContext.getConf().get(token_key)
    with open(token_path) as fd:
        delegation_token = fd.read()
    signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(
        delegation_token=delegation_token)
    object_store_client = oci.object_storage.ObjectStorageClient(config={}, signer=signer)
    
    # Extract the wallet file location info
    split_url = urlparse(WALLET_PATH)
    bucket_name, namespace = split_url.netloc.split("@")
    file_name = split_url.path[1:]
    
    # Get the wallet from Object Storage.
    # The response contains the wallet and some metadata
    response = object_store_client.get_object(namespace, bucket_name, file_name)
    
    # Extract the wallet from response and store it in the Spark work-dir
    wallet_path = "/opt/spark/work-dir/"
    zip_file_path = os.path.join(wallet_path, "temp.zip")
    with open(zip_file_path, "wb") as fd:
        for chunk in response.data.raw.stream(1024 * 1024, decode_content=False):
            fd.write(chunk)
    with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
        zip_ref.extractall(wallet_path)
    
    # Extract the wallet contents and add the files to the Spark context
    contents = "cwallet.sso ewallet.p12 keystore.jks ojdbc.properties sqlnet.ora tnsnames.ora truststore.jks".split()
    for file in contents:
        spark.sparkContext.addFile(os.path.join(wallet_path, file))
    
    
  6. 將資料儲存在 Autonomous Data Warehouse 資料庫中。

    下列程式碼片段顯示如何連線至 ADW,並將資料範圍寫入目標表格。如果表格不存在,就會建立表格。如果表格存在,會因為模式參數中的「覆寫」選項而刪除並重新建立表格。

    密碼是從 Oracle Cloud Infrastructure Vault 擷取。在執行 Terraform 程式碼之後,請將密碼放在 Vault。在執行 Terraform 之後的步驟中,系統將在 Vault 中輸入密碼。

    adw_url = "jdbc:oracle:thin:@{}?TNS_ADMIN={}".format(TNS_NAME, wallet_path)
    
    # Retrieve the database password for user 'ADMIN' from OCI Vault
    # The password is stored as base64 text, so it must be decoded
    with open(token_path) as fd:
        delegation_token = fd.read()
    signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(
        delegation_token=delegation_token)
    secrets_client = oci.secrets.SecretsClient(config={}, signer=signer)
    response = secrets_client.get_secret_bundle(PASSWORD_SECRET_OCID)
    
    base64_password = response.data.secret_bundle_content.content
    base64_secret_bytes = base64_password.encode("ascii")
    base64_message_bytes = base64.b64decode(base64_secret_bytes)
    password = base64_message_bytes.decode("ascii")
    
    # Set up some properties for connecting to the database
    properties = {
        "driver": "oracle.jdbc.driver.OracleDriver",
        "oracle.net.tns_admin": TNS_NAME,
        "password": password,
        "user": USER,
    }
    
    # Write the dataframe to the database
    adw_url = "jdbc:oracle:thin:@{}?TNS_ADMIN={}".format(TNS_NAME, wallet_path)
    logdata_df.write.jdbc(url=adw_url, table=TARGET_DATABASE_TABLE, 
        mode="Overwrite", properties=properties)
    

以名稱 dataflow-app.py 儲存 PySpark App。

下載並套用 Terraform 程式碼

GitHub 上的 Oracle quickstart 儲存區域中會提供 Terraform 程式碼。此程式碼提供架構圖中顯示的元件。

下載並套用程式碼:

  1. 變更至方便目錄並複製儲存區域。

    git clone https://github.com/oracle-quickstart/oci-arch-dataflow-store-analyze-data

  2. 將相依性檔案 archive.zip 複製到 oci-arch-dataflow-store-analyze-data 目錄中。

    注意:如果您沒有 archive.zip 檔案,請參閱先決條件小節,瞭解如何取得檔案。

  3. 變更至 oci-arch-dataflow-store-analyze-data 目錄。

    cd oci-arch-dataflow-store-analyze-data

  4. 找出 PySpark App 檔案 dataflow-app.py (它是 stub 檔案),並將它取代為您撰寫的檔案。

  5. 在文字編輯器中開啟 terraform.tfvars 檔案。

  6. 更新 terraform.tfvarsobject_storage_namespace 的值。請使用您先前記錄的值。

  7. 視您使用的電腦作業系統而定,修改 env-vars.batenv-vars.sh。這些命令檔是用來設定 Terraform 使用的環境變數。

    在 Windows 上,env-vars.bat 檔案應與下列範例類似。請務必貼到自己租用戶的值。

    @echo off
    set TF_VAR_tenancy_ocid=ocid1.tenancy.oc1..
    set TF_VAR_user_ocid=ocid1.user.oc1..
    set TF_VAR_compartment_ocid=ocid1.compartment.oc1..
    set TF_VAR_private_key_path=%HOMEPATH%\.ssh\oci_api_key.pem
    set TF_VAR_fingerprint=2a:b4:ef:9c:f1...
    set TF_VAR_region=ca-toronto-1
    

    在 Linux 和 Mac 上,env-vars.sh 檔案應與下列範例類似。請務必貼到自己租用戶的值。

    export TF_VAR_tenancy_ocid=ocid1.tenancy.oc1..
    export TF_VAR_user_ocid=ocid1.user.oc1..
    export TF_VAR_compartment_ocid=ocid1.compartment.oc1..
    export TF_VAR_private_key_path=~/.ssh/oci_api_key.pem
    export TF_VAR_fingerprint=2a:b4:ef:9c:f1...
    export TF_VAR_region=ca-toronto-1
    

    以自己的租用戶和使用者資訊更新檔案之後,請確定環境變數已設定。

    在 Windows 上,執行下列命令:

    env-vars.bat
    

    在 Linux 或 Mac 上,執行下列命令:

    source env-vars.sh
    
  8. 在終端機中,執行 init 指令

    terraform init

  9. 在 Oracle Infrastructure Cloud 佈建資源。

    terraform apply

    完成之後,Terraform App 會顯示建立之資料庫的 ADMIN 密碼。

  10. 複製 Terraform 應用程式顯示的密碼。

  11. 確認已經佈建資源。

    1. 前往您的 OCI 主控台並開啟導覽功能表。
    2. 向下捲動至治理與管理區段,展開管理,然後按一下租用戶總管
    3. 選取您先前建立的資料流程區間。

    在數秒鐘之後,資源就會出現在可篩選和排序的表格中。

更新保存庫中的 ADMIN 密碼

您先前已將假密碼新增至 OCI 保存庫,以取得用於資料流程中執行之 PySpark App 的 OCID。現在您必須將該密碼更新為真實值。完成 Terraform 套用命令時,真正的值會顯示在您的終端機中。

  1. 在主控台導覽功能表中,選取安全性,然後按一下錯誤
  2. 按一下先前建立的保存庫名稱。
  3. 在開啟的「 Vault 詳細資訊」頁面中,查看「資源」區段,然後按一下安全
  4. 按一下先前建立的加密密碼名稱。
  5. 按一下建立加密密碼版本
  6. 確定秘密類型樣板設為純文字,然後將資料庫 ADMIN 密碼貼到加密密碼內容方塊中。
  7. 按一下建立加密密碼版本

執行資料流程應用程式

現在就是執行 PySpark App 的時候。

  1. 在主控台導覽功能表中,選取資料流程,然後按一下應用程式
  2. 按一下分析內部部署日誌以開啟「應用程式詳細資訊」頁面。
  3. 按一下執行以開啟「執行 Python 應用程式」面板。
  4. 按一下執行

「資料流程執行」頁面會開啟,您可以在其中監督執行的進度。配置資源需要幾分鐘的時間,並將所有資源運轉。執行完成後,按一下執行的名稱以開啟「執行詳細資料」頁面。

在「執行詳細資訊 (Run Details)」頁面中,您可以下載並檢查產生的日誌。如果因故執行失敗,您可以檢查日誌以判斷問題來源。

致謝

深入瞭解

探索 docs.oracle.com/learn 上的其他實驗室,或是存取更多免費學習內容至 Oracle Learning YouTube 通道。此外,瀏覽 education.oracle.com/learning-explorer 以成為 Oracle Learning Explorer。

如需產品文件,請瀏覽 Oracle Help Center

其他學習資源

探索 docs.oracle.com/learn 上的其他實驗室,或是存取更多免費學習內容至 Oracle Learning YouTube 通道。此外,瀏覽 education.oracle.com/learning-explorer 以成為 Oracle Learning Explorer。

如需產品文件,請瀏覽 Oracle Help Center