注意:

使用 Oracle Cloud Infrastructure Streaming 和 OCI 資料流程以及微批次處理串流 AVRO 訊息

簡介

在現今的資料導向環境中,處理和分析即時資料串流的能力,對於希望獲得洞察力並快速因應不斷變化狀況的企業來說至關重要。串流資料處理技術已成為處理大量連續資料串流的熱門解決方案。在本教學課程中,我們將探索一種創新方法,以 Oracle Cloud Infrastructure (OCI) Streaming 有效率地串流 AVRO 訊息,再加上微批次處理技術,並根據開源 FN 專案使用無伺服器功能增強 Oracle Functions 的功能。

AVRO 和串流資料簡介

AVRO 是一種廣泛採用的資料序列化格式,其效率可代表資料結構的複雜性以及與各種程式設計語言的相容性。與串流技術整合後,AVRO 能讓組織幾乎即時傳輸和處理資料,進而擷取寶貴的洞察分析,而不會發生與批次處理相關的延遲。

OCI Streaming:強化即時資料

Oracle Cloud Infrastructure (OCI) 提供一組工具來處理雲端中的資料,OCI Streaming 則是針對高傳輸量即時資料串流量身打造的這類服務。藉由運用 OCI Streaming,開發人員可以建構可有效率地擷取、處理及分配資料串流的可擴展且可靠的資料管線。

OCI 資料流程管理的 Spark:無鎖定式解決方案

Oracle Cloud Infrastructure (OCI) Data Flow 是一個完全託管的 Apache Spark 服務,可在極大型的資料集上執行處理工作,而不需要部署或管理基礎架構。

微批次處理微批次處理涉及使用時間或大小作為條件,將內送資料串流細分為精簡批次。接著會將這些批次處理為較小的工作。與串流處理中記錄的常數和個別處理不同,微批次處理會在處理前導入一些延遲和儲存,讓使用者能夠更進一步控制要如何處理資料。與以間隔處理大數據集的傳統批次處理不同,微批次處理提供了近乎即時的處理和交付結果。

解除鎖定 Synergy:OCI Streaming、OCI Data Flow 和 Oracle Functions

本教學課程深入解說 OCI Streaming、OCI Data Flow 管理的 Spark Streaming 和 Oracle Functions 的融合。我們將引導您完成設定端對端串流資料管線以擷取 AVRO 編碼的訊息、使用 OCI Data Flow 託管的 Spark 微批次處理功能有效地處理這些訊息,並導入 Oracle Functions 的無伺服器事件導向處理。

目標

使用 OCI Streaming 和 OCI Data Flow 管理的 Spark 微批次處理,以使用 AVRO 格式建立有效率的即時資料處理管線。

重要事項:本教學課程僅針對教育和研究目的而設計。它提供一個讓學員實驗的環境,並在受控制的環境中取得實際的體驗。請注意,本實驗室使用的安全組態和措施可能不適合實際情況。

實際應用系統的安全考量通常都較為複雜且動態。因此,在實作生產環境中示範的任何技術或組態之前,進行全面性的安全評估和檢閱至關重要。此複查應涵蓋安全的所有層面,包括存取控制、加密、監督與合規,以確保系統符合組織的安全原則和標準。

從實驗室環境轉換到實際部署時,安全性永遠是首要任務。

處理流程
T0_1

高層次架構
T0_1

先決條件 - Oracle Cloud Infrastructure

先決條件 - 本機機器環境

作業 1:設定動態群組

  1. 前往您的網域,按一下動態群組並建立下列群組。

    群組名稱:MyFunctions

    ALL {resource.type = 'fnfunc', resource.compartment.id = 'pasteYourCompartmentID'}
    

    群組名稱:ContainerIntances

    ALL {resource.type='compute-container-instances',  resource.compartment.id = 'pasteYourCompartmentID'}
    

    群組名稱:DataFlowDynamicGroup

    ALL {resource.type='dataflowrun', resource.compartment.id = 'pasteYourCompartmentID'}
    

作業 2:建立原則

作業 3:建立儲存的儲存桶及上傳 AVRO 綱要

  1. 前往儲存桶,按一下建立儲存桶,然後建立一個名為 avro-schema-bucket 的新儲存桶,以儲存 AVRO 綱要檔案。

    T3_1

  2. 現在,選取您的儲存桶,並在命名空間中標註,之後將需要使用該儲存桶。

    T3_1

  3. 將檔案 user.asvc 上傳至此建立的儲存桶。

    T3_1

作業 4:建立專用 OCI 串流主題

  1. 前往分析和 AI ,接著按一下串流,然後建立一個名為 FrontDoorTopic 的新串流。

    T4_0

  2. 選取串流集區,按一下 PrivatePool ,然後按一下 Kafka 連線設定值選項和欄位的注意事項,我們之後將需要它。

    T4_0

作業 5:建立 AUTH TOKEN

為您的使用者建立 AUTH TOKEN,此為使用 Kafka 主題的必要項目

  1. 按一下右上角的使用者圖示,然後選取使用者設定值選項。

  2. 按一下認證權杖,然後產生新的權杖和您權杖的安全注意事項

    T4_1

作業 6:建立容器登錄

  1. 前往開發人員服務功能表,按一下容器登錄並建立下列專用儲存區域。

    儲存區域名稱 類型
    api-avro-sample_a 私人
    api-avro-sample_b 私人
    fn-recep-avro 鍵 私人
  2. 檢查命名空間的儲存區域和警告

    T6_1

  3. 開啟已安裝 OCI CLI 與 Docker 的終端機 Shell,然後繼續登入登錄。請檢查您 REGION 的正確 URL。在本教學課程中,我們使用巴西東部 (聖保羅) ,其中登錄 URL 為 gru.ocir.io

    docker login gru.ocir.io
    Username: <your container namespace>/youruser
    Password: YOUR_AUTH_TOKEN_CREATED_EARLIER
    

    T6_1

作業 7:建立 OCI 保存庫

建立 OCI 保存庫,並且提供此教學課程之後將使用的必要變數。

  1. 前往識別與安全,按一下保存庫,然後按一下建立保存庫

    T7_1new

  2. 選取新保存庫,然後為其建立主要加密金鑰

    T7_1new

  3. 建立一個稱為 AUTH_KEY 的新加密密碼,然後貼上稍早建立的認證金鑰。

    T7_1new

  4. 重複建立加密密碼處理作業並建立下列新加密密碼:

    變數名稱
    KAFKA_BOOTSTRAPSERVER 「OCI Streaming 組態的大幅增加伺服器」
    KAFKA_TOPIC “FrontDoorTopic”
    KAFKA_USERNAME 「您的 OCI Streaming 組態使用者名稱 + 串流 ID」
    AUTH_KEY 「您先前步驟建立的 AUTH 權杖」
  5. 注意為每個加密密碼建立的加密 OCID ,並建立新的組態檔。

    • config.properties 檔案包含從應用程式到保存庫加密密碼 OCID 的對應變數。應用程式將使用此檔案來識別在程式實際執行期間需要收集哪些保存庫加密密碼。

    • 在您的本機機器上建立可存取 OCI-CLI 的新檔案:
      以您的 OCID 取代每秒
      檔案名稱:config.properties

      kafka_bootstrapserver_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURS
      kafka_topic_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURSxxxxxx
      kafka_username_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURS
      auth_token_vaultOCID=ocid1.vaultsecret.oc1.REPLACE-WITH-YOURS
      
  6. 前往儲存桶,按一下建立儲存桶,然後建立一個名為 config 的新儲存桶,以儲存 config.properties 檔案。

  7. config.properties 檔案上傳到儲存的儲存桶 config

    ls -lrt config.properties
    oci os object put -bn config --file config.properties --force
    

    T7_1new

作業 8:建立簡單的 AVRO 訊息,並使用提供的範例 python 程式碼將訊息儲存至檔案

  1. 開啟已安裝 OCI CLI、Docker 和 Python3 的 Shell 終端機,然後根據先前上傳至物件儲存中的相同 AVRO 綱要,建立包含單一訊息的新 AVRO 檔案。

    注意:您必須檢查 Python 版本,在此教學課程中使用 Python 3.9.16 ,較舊的版本可能無法使用。

  2. 請從此處取得驗證碼:Create_avro_sample.zip

  3. 將其解壓縮至您選擇的位置,然後執行程式以產生範例 AVRO 訊息:

    cd ~
    mkdir create_avro_sample
    cd create_avro_sample
    unzip CreateAVRO_SampleFile.zip
    # Check the files are there
    ls -lrt
    # install the python dependencies on requirements.txt
    pip3 install -r requirements.txt
    # Run the program and create an AVRO message file
    python3 create_avro_sample.py meu_file.bin '{"id":10029,"name":"John","email":"john@bla.com"}'
    

    T8_1 T8_1 T8_1

作業 9:建立 OCI 函數以接收 AVRO 訊息並發布至 OCI Streaming 主題

  1. 前往開發人員服務函數底下,按一下應用程式,然後按一下建立應用程式

    T9_1

  2. 請前往已安裝 Docker、OCI CLI、FN CLI 的終端機 Shell,然後執行下列命令來初始化函數。

    注意:如果您遵循這些步驟,您的 Docker 登入命令現在已經執行 (如果尚未執行),請繼續進行建立容器登錄作業中的 Docker 登入步驟。

    fn create context oci-cloud --provider oracle
    fn use context oci-cloud
    fn update context oracle.compartment-id PASTE_YOUR_COMPARTMENT_OCID
    fn update context api-url https://functions.sa-saopaulo-1.oraclecloud.com
    fn update context registry gru.ocir.io/PASTE_YOUR_REGISTRY_NAMESPACE
    

    注意:在本教學課程中,如果您使用不同的區域,請使用巴西東部 (聖保羅) 區域,您必須變更 api-urlregistry 位置。

    T9_1

  3. 建立簡單的 Hello-world 函數,以確保您的所有設定值都正確。

    fn init --runtime python fn-recep-avro
    cd fn-recep-avro
    fn deploy --app MyReceptionApp
    fn invoke MyReceptionApp fn-recep-avro
    

    T9_1

  4. 取得 fn-recep-avro.zip 檔案中的 AVRO 函數範例程式碼,然後取代我們先前建立的 hello-world 程式碼。您必須同時取得 func.pyrequirements.txt 這兩個檔案,才能正常運作。

    # Check you have the right code for func.py & requirements.txt (you got from zip file)
    ls -lrt
    

    T9_1

  5. 建置新程式碼並部署函數

    fn deploy --app MyReceptionApp
    

    T9_1

  6. 若要呼叫函數,我們需要傳送 AVRO 訊息作為參數,因此將使用先前步驟建立的範例 AVRO 訊息檔案。第一次呼叫函數時會花費較長的時間,因為該函數需要啟動。

    # Check where you created the sample avro message file
    ls -lrt ../create_avro_sample/
    
    # Invoke the function to check if it's working as expected
    echo -n -e "$(cat ../create_avro_sample/meu_file.bin)" | fn invoke MyReceptionApp fn-recep-avro
    

    T9_1

作業 10:建立 API 閘道以顯示函數

  1. 在您的主控台中,依序按一下開發人員服務API 管理底下的閘道,然後按一下建立閘道

    T10_1

  2. 建立之後,請按一下部署選項,然後按一下建立部署

    名稱: RecepFunction
    路徑前置碼: /

    • 在「認證」中,選擇無認證,因為這是簡單的實驗室,而且未實行任何 API 認證。此處的主要目標是展示透過 API 傳送二進位 AVRO 訊息的 HTTPS 呼叫,以及針對此實驗室的目的,我們不會對此簡單實驗室實行任何認證方法。
    • 在邁向實際使用環境之前,請務必遵循 API 閘道的安全最佳做法。
    • 如需詳細資訊,請參閱保護 API 閘道與資源

    路由 1: 路徑: /

    Methos: POST
    後端類型: Oracle 函數
    應用程式:選取您的函數

    T9_1

    T9_1

    T9_1

    T9_1

  3. 檢查 API 閘道端點並注意。

    T9_1

  4. 開啟您的 Linux Shell 終端機,然後呼叫 API 閘道。請使用先前步驟中正確的端點取代 API URL

    cd create_avro_sample/
    ls -lrt
    curl -X POST -H "Content-Type: application/octet-stream" \
         -d "$(echo -n -e "$(cat meu_file.bin)")" \
         https://xxxxxxxxxxxxx.apigateway.sa-saopaulo-1.oci.customer-oci.com/
    

    T9_1

檢查點

T9_1

作業 11:為 API 類型 A 建置容器映像檔

注意API 類型 A 和 B 程式碼基本上與只使用不同的標頭訊息相同,以便模擬兩個不同的 API

  1. 從 API type A 取得程式碼,並將它解壓縮到您的 Linux Shell 終端機 api-avro-sample_a.zip 中。

  2. 取得您先前步驟的容器登錄命名空間,然後依照下方樣式建立應用程式登錄位置。ocir url 會根據您的區域而定,例如 gru.ocir.io 代表 Brasil East (SaoPaulo)

    [ocir url]/[ 您的命名空間 ]/api-avro-sample_a:最新

  3. 在 Linux Shell 終端機中,建置並推送此 API 的 docker 映像檔。

    ls -lrt
    docker build . -t gru.ocir.io/yournamespace/api-avro-sample_a:latest
    docker push gru.ocir.io/yournamespace/api-avro-sample_a:latest
    

    T10_1 T10_1

作業 12:為 API 類型 B 建立容器映像檔

  1. 從 API type B 取得程式碼,並將它解壓縮到您的 Linux Shell 終端機 api-avro-sample_b.zip 中。

  2. 取得您先前步驟的容器登錄命名空間,然後依照下方樣式建立應用程式登錄位置。ocir url 會根據您的區域而定,例如 gru.ocir.io 代表 Brasil East (SaoPaulo)

    [ocir url]/[ 您的命名空間 ]/api-avro-sample_b:最新

  3. 在 Linux Shell 終端機中,建置並推送此 API 的 docker 映像檔。

    ls -lrt
    docker build . -t gru.ocir.io/yournamespace/api-avro-sample_b:latest
    docker push gru.ocir.io/yournamespace/api-avro-sample_b:latest
    

    T10_1

    T10_1

  4. 如果已順利推送映像檔,請檢查您的容器登錄頁面。

    T10_1

作業 13:在容器服務中部署 API

  1. 前往開發人員服務容器執行處理,然後按一下建立容器執行處理

    T13_1

    T13_1

  2. api-type-b 重複步驟 1,並為 TYPE B API 選取正確的影像。

    1. 前往開發人員服務容器執行處理,然後按一下建立容器執行處理,然後重複部署 API 類型 B 的步驟

    2. 從您的容器執行處理取得內部 FQDN 位址

      T14_1

      • 按一下容器執行處理,並記下每個內部 FQDN 位址

      T14_1

    3. 前往識別與安全,按一下保存庫,選取您的 VAULT,然後建立兩個新的加密密碼

      加密密碼名稱
      API_TYPE_A_URL 貼上 API 類型 A 的內部 FQDN 專用位址
      API_TYPE_B_URL 貼上 API 類型 B 的內部 FQDN 專用位址

      記下每個加密密碼 OCID

      您的保存庫現在應該如下所示:

      T14_1

    4. 編輯上傳到 config 儲存體儲存桶的 config.properties 檔案,並新增加密密碼 OCID 的新項目

      ls -lrt config.properties
      vi config.properties
      api_type_a_url_vaultOCID=paste_API_TYPE_A_URL_secretOCID
      api_type_b_url_vaultOCID=paste_API_TYPE_B_URL_secretOCID
      
      # After save, please upload the new version to Object Storage
      cat config.properties
      oci os object put -bn config --file config.properties --force
      

      檔案應顯示如下:
      T14_1

      T14_1

作業 14:使用 create_avro_sample.py 測試 API

  1. 請前往您的 Linux Shell 終端機,此終端機會從 Task 7 儲存 create_avro_sample.py ,並建立一些用於測試 API 呼叫的新訊息。我們正在建立具有不同 ID 的兩個新的 AVRO 檔案 (1010 a 1020),我們將用來作為 Spark Stream (DataFlow) 程式內的篩選條件。

    ls -lrt
    python3 create_avro_sample.py type_a_message.bin '{"id":1010,"name":"Paul that goes to API type A","email":"paul@bla.com"}'
    
    python3 create_avro_sample.py type_b_message.bin '{"id":1020,"name":"Mary that goes to API type B","email":"mary@bla.com"}'
    
    

    T14_1

  2. 呼叫傳送 AVRO 訊息以進行測試的 API 運作正常。前往您的容器執行處理頁面,然後取得每個 API api-type-aapi-type-b內部 FQDN 位址。請記得從您的 API 取代下方相對應內部 FQDN 位址的 URL。

    ls -lrt type*
    
    curl -i -X POST -H "Content-Type: application/octet-stream" \
       --data-binary "@type_a_message.bin" \
       xxx.xx.x.xxx
    
    curl -i -X POST -H "Content-Type: application/octet-stream" \
       --data-binary "@type_b_message.bin" \
       xxx.xxx.xx.xxx
    
    

    T14_1

作業 15:設定 Java Spark 串流處理應用程式

  1. 前往儲存桶,按一下建立儲存桶,然後建立兩個名為 dataflow-app-avrodataflow-logs-avro 的新儲存桶,這將會用來上傳您的 java 應用程式。

  2. 請再次檢查您的 java 環境版本。

    Java

    java 11.0.8 2020-07-14 LTS
    Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS)
    Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
    

    Maven 版本

    Apache Maven 3.5.4 (Red Hat 3.5.4-5)
    Maven home: /usr/share/maven
    Java version: 11.0.20, vendor: Red Hat, Inc., runtime: /usr/lib/jvm/java-11-openjdk-11.0.20.0.8-3.0.1.el8.x86_64
    Default locale: en_US, platform encoding: ANSI_X3.4-1968
    OS name: "linux", version: "5.15.0-103.114.4.el8uek.x86_64", arch: "amd64", family: "unix"
    
  3. 下載範例程式碼,並將它解壓縮到您有 oci-cli、docker、java 以及 maven 的本機環境中: spark-consume-avro-message.zip

    unzip spark-consume-avro-message.zip
    cd spark-consume-avro-message
    ls -lrt
    

    T15_1 T15_1

    深入瞭解代理主機程式碼,呼叫容器執行處理類型 AB

    檢查主程式檔案 .src/main/java/example/Example.java.... T15_1

  4. 因為這個 Java 程式使用程式庫來處理 spark-avro,所以我們需要封裝相依性才能將它傳送到資料流程。因此,如果您需要更多詳細資訊,請使用資料流程相依性封裝器,請前往資料流程相依性封裝器

    套裝軟體 org.apache.spark:spark-avro_2.12:3.2.1 已經在 packages.txt 檔案宣告,您只需要執行以下動作即可封裝它:

    docker run --privileged --platform linux/amd64 --rm -v $(pwd):/opt/dataflow  --pull always -it phx.ocir.io/oracle/dataflow/dependency-packager:latest -p 3.8
    

    T15_1 T15_1 T15_1

  5. 使用 oci-cli,將 archive.zip 檔案上傳到名為 dataflow-app-avro 的儲存體儲存桶。

    oci os object put -bn dataflow-app-avro --file archive.zip --force
    
  6. 編譯、封裝 java 應用程式,然後上傳到儲存的儲存桶 dataflow-app-avro

    ls -lrt
    mvn clean install
    

    T15_1
    ... 減少編譯日誌的行數 ... T15_1

    # upload the JAR file to the storage bucket
    oci os object put -bn dataflow-app-avro --file target/consumekafka-1.0-SNAPSHOT.jar --force
    

    T15_1

  7. 檢查您目前的 dataflow-app-avro 儲存儲存的儲存桶,並且確定其外觀。

    T15_1

  8. 前往分析與 AI ,然後在資料湖底下按一下資料流程,選取左側功能表專用端點,然後按一下建立專用端點

    • 我們使用容器執行處理和 OCI 串流集區的 PRIVATE 子網路,因此需要有專用端點。

    • 確定您是從 OCI 容器執行處理和 OCI 串流集區,在內部 FQDN 填入 DNS 區域 (使用逗號區隔)。

      T15_1

  9. 前往分析與 AI ,然後在資料湖底下按一下資料流程,然後按一下建立應用程式

    T15_1
    T15_1
    T15_1
    T15_1
    T15_1
    T15_1

    • 建立之後,請選取 spark-lab-avro 資料流程,然後按一下執行 (Run) 啟動程式,啟動程式通常需要 8 分鐘。

      T15_1
      T15_1

  10. 檢查「執行中」資料流程應用程式,然後開啟將顯示目前工作且應用程式正在運作的 SparkUI。

    T15_1

    T15_1

    T15_1

作業 16:驗證流程

呼叫函數並傳送訊息以檢查所有流程是否如預期般運作。

  1. 開啟您的 Linux shell 終端機,您可以在該終端機中建立範例訊息 type_a_message.bintype_b_message.bin ,然後傳送訊息。請使用您從 API 閘道建立取得的正確端點取代 API URL

    cd create_avro_sample/
    ls -lrt
    curl -X POST -H "Content-Type: application/octet-stream" \
       -d "$(echo -n -e "$(cat type_a_message.bin)")" \
       https://xxxxxxxxxxxxx.apigateway.sa-saopaulo-1.oci.customer-oci.com/
    

    T16_1

  2. 檢查容器執行處理上的日誌,檢查是否呼叫 API 類型 A

    T16_1 T16_1

您可以重複此程序並傳送一個 type_b_message.bin 檔案,然後它就會呼叫 B 容器執行處理類型。

T9_1

確認

其他學習資源

探索 docs.oracle.com/learn 的其他實驗室,或者存取更多 Oracle Learning YouTube 頻道上的免費學習內容。此外,請瀏覽 education.oracle.com/learning-explorer 以成為 Oracle Learning 檔案總管。

如需產品文件,請造訪 Oracle Help Center