注意:
- 此教學課程需要存取 Oracle Cloud。若要註冊免費帳戶,請參閱開始使用 Oracle Cloud Infrastructure Free Tier 。
- 它會使用 Oracle Cloud Infrastructure 證明資料、租用戶及區間的範例值。完成實驗室時,請將這些值替代為您雲端環境特定的值。
使用 Oracle Cloud Infrastructure Streaming 和 OCI 資料流程以及微批次處理串流 AVRO 訊息
簡介
在現今的資料導向環境中,處理和分析即時資料串流的能力,對於希望獲得洞察力並快速因應不斷變化狀況的企業來說至關重要。串流資料處理技術已成為處理大量連續資料串流的熱門解決方案。在本教學課程中,我們將探索一種創新方法,以 Oracle Cloud Infrastructure (OCI) Streaming 有效率地串流 AVRO 訊息,再加上微批次處理技術,並根據開源 FN 專案使用無伺服器功能增強 Oracle Functions 的功能。
AVRO 和串流資料簡介
AVRO 是一種廣泛採用的資料序列化格式,其效率可代表資料結構的複雜性以及與各種程式設計語言的相容性。與串流技術整合後,AVRO 能讓組織幾乎即時傳輸和處理資料,進而擷取寶貴的洞察分析,而不會發生與批次處理相關的延遲。
OCI Streaming:強化即時資料
Oracle Cloud Infrastructure (OCI) 提供一組工具來處理雲端中的資料,OCI Streaming 則是針對高傳輸量即時資料串流量身打造的這類服務。藉由運用 OCI Streaming,開發人員可以建構可有效率地擷取、處理及分配資料串流的可擴展且可靠的資料管線。
OCI 資料流程管理的 Spark:無鎖定式解決方案
Oracle Cloud Infrastructure (OCI) Data Flow 是一個完全託管的 Apache Spark 服務,可在極大型的資料集上執行處理工作,而不需要部署或管理基礎架構。
微批次處理微批次處理涉及使用時間或大小作為條件,將內送資料串流細分為精簡批次。接著會將這些批次處理為較小的工作。與串流處理中記錄的常數和個別處理不同,微批次處理會在處理前導入一些延遲和儲存,讓使用者能夠更進一步控制要如何處理資料。與以間隔處理大數據集的傳統批次處理不同,微批次處理提供了近乎即時的處理和交付結果。
解除鎖定 Synergy:OCI Streaming、OCI Data Flow 和 Oracle Functions
本教學課程深入解說 OCI Streaming、OCI Data Flow 管理的 Spark Streaming 和 Oracle Functions 的融合。我們將引導您完成設定端對端串流資料管線以擷取 AVRO 編碼的訊息、使用 OCI Data Flow 託管的 Spark 微批次處理功能有效地處理這些訊息,並導入 Oracle Functions 的無伺服器事件導向處理。
目標
使用 OCI Streaming 和 OCI Data Flow 管理的 Spark 微批次處理,以使用 AVRO 格式建立有效率的即時資料處理管線。
重要事項:本教學課程僅針對教育和研究目的而設計。它提供一個讓學員實驗的環境,並在受控制的環境中取得實際的體驗。請注意,本實驗室使用的安全組態和措施可能不適合實際情況。
實際應用系統的安全考量通常都較為複雜且動態。因此,在實作生產環境中示範的任何技術或組態之前,進行全面性的安全評估和檢閱至關重要。此複查應涵蓋安全的所有層面,包括存取控制、加密、監督與合規,以確保系統符合組織的安全原則和標準。
從實驗室環境轉換到實際部署時,安全性永遠是首要任務。
處理流程
高層次架構
先決條件 - Oracle Cloud Infrastructure
- 具有管理層次存取權限的 Oracle 帳戶
- 用以建立資源的區間:注意 COMPARMENT ID
- 含有兩個子網路 (專用和公用) 的 VCN,請參閱建立網路教學課程
- 請確定您的子網路對專用子網路和公用子網路的服務閘道和連接埠 443 和 80 均有適當的傳入規則
先決條件 - 本機機器環境
-
PRIVATE 子網路上的 Oracle Linux 運算執行處理。這對於存取 PRIVATE 子網路上的資源很重要,例如將在本教學課程中部署的 OCI Streaming、Functions 以及容器執行處理。
-
連線至 Oracle Linux 運算執行處理並執行教學課程作業的堡壘主機。如需詳細資訊,請參閱堡壘主機簡介。
-
本機 OCI-CLI 設定。如需詳細資訊,請參閱 Installing the CLI tutorial 。
-
如果您使用 Oracle Linux,本機 DOCKER 可建置映像檔,請參閱 Install Docker on Oracle Linux 。
-
測試時安裝的本機 Python 版本 3.9.16 (至少)
-
本機 Java JDK 11.0.8
-
本機 Maven 3.5.4
-
可將函數部署至 OCI 的本機 FN CLI:安裝 FN CLI
作業 1:設定動態群組
-
前往您的網域,按一下動態群組並建立下列群組。
群組名稱:MyFunctions
ALL {resource.type = 'fnfunc', resource.compartment.id = 'pasteYourCompartmentID'}
群組名稱:ContainerIntances
ALL {resource.type='compute-container-instances', resource.compartment.id = 'pasteYourCompartmentID'}
群組名稱:DataFlowDynamicGroup
ALL {resource.type='dataflowrun', resource.compartment.id = 'pasteYourCompartmentID'}
作業 2:建立原則
-
前往原則並建立下列原則。
原則名稱:FunctionsPolicies
Allow dynamic-group MyFunctions to manage objects in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group ContainerIntances to manage objects in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group MyFunctions to read secret-bundles in compartment YOUR-COMPARTMENT-NAME
原則名稱:StreamTopicPolicies
Allow dynamic-group ContainerIntances to read objects in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group ContainerIntances use stream-push in compartment YOUR-COMPARTMENT-NAME
原則名稱:ContainerInstancesPolicy
Allow dynamic-group ContainerIntances to read repos in compartment YOUR-COMPARTMENT-NAME
原則名稱:DataFlowPolicies
Allow dynamic-group DataFlowDynamicGroup to manage objects in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group DataFlowDynamicGroup to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group DataFlowDynamicGroup to manage data-catalog-metastores in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group DataFlowDynamicGroup to manage object-family in compartment YOUR-COMPARTMENT-NAME Allow dynamic-group DataFlowDynamicGroup to read secret-bundles in compartment YOUR-COMPARTMENT-NAME Allow service dataflow to read objects in compartment YOUR-COMPARTMENT-NAME
作業 3:建立儲存的儲存桶及上傳 AVRO 綱要
-
前往儲存桶,按一下建立儲存桶,然後建立一個名為 avro-schema-bucket 的新儲存桶,以儲存 AVRO 綱要檔案。
-
現在,選取您的儲存桶,並在命名空間中標註,之後將需要使用該儲存桶。
-
將檔案 user.asvc 上傳至此建立的儲存桶。
作業 4:建立專用 OCI 串流主題
-
前往分析和 AI ,接著按一下串流,然後建立一個名為 FrontDoorTopic 的新串流。
-
選取串流集區,按一下 PrivatePool ,然後按一下 Kafka 連線設定值選項和欄位的注意事項,我們之後將需要它。
作業 5:建立 AUTH TOKEN
為您的使用者建立 AUTH TOKEN,此為使用 Kafka 主題的必要項目
-
按一下右上角的使用者圖示,然後選取使用者設定值選項。
-
按一下認證權杖,然後產生新的權杖和您權杖的安全注意事項。
作業 6:建立容器登錄
-
前往開發人員服務功能表,按一下容器登錄並建立下列專用儲存區域。
儲存區域名稱 類型 api-avro-sample_a 私人 api-avro-sample_b 私人 fn-recep-avro 鍵 私人 -
檢查命名空間的儲存區域和警告。
-
開啟已安裝 OCI CLI 與 Docker 的終端機 Shell,然後繼續登入登錄。請檢查您 REGION 的正確 URL。在本教學課程中,我們使用巴西東部 (聖保羅) ,其中登錄 URL 為 gru.ocir.io 。
docker login gru.ocir.io Username: <your container namespace>/youruser Password: YOUR_AUTH_TOKEN_CREATED_EARLIER
作業 7:建立 OCI 保存庫
建立 OCI 保存庫,並且提供此教學課程之後將使用的必要變數。
-
前往識別與安全,按一下保存庫,然後按一下建立保存庫。
-
選取新保存庫,然後為其建立主要加密金鑰。
-
建立一個稱為 AUTH_KEY 的新加密密碼,然後貼上稍早建立的認證金鑰。
-
重複建立加密密碼處理作業並建立下列新加密密碼:
變數名稱 值 KAFKA_BOOTSTRAPSERVER 「OCI Streaming 組態的大幅增加伺服器」 KAFKA_TOPIC “FrontDoorTopic” KAFKA_USERNAME 「您的 OCI Streaming 組態使用者名稱 + 串流 ID」 AUTH_KEY 「您先前步驟建立的 AUTH 權杖」 -
注意為每個加密密碼建立的加密 OCID ,並建立新的組態檔。
-
config.properties 檔案包含從應用程式到保存庫加密密碼 OCID 的對應變數。應用程式將使用此檔案來識別在程式實際執行期間需要收集哪些保存庫加密密碼。
-
在您的本機機器上建立可存取 OCI-CLI 的新檔案:
以您的 OCID 取代每秒
檔案名稱:config.propertieskafka_bootstrapserver_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURS kafka_topic_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURSxxxxxx kafka_username_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURS auth_token_vaultOCID=ocid1.vaultsecret.oc1.REPLACE-WITH-YOURS
-
-
前往儲存桶,按一下建立儲存桶,然後建立一個名為 config 的新儲存桶,以儲存
config.properties
檔案。 -
將 config.properties 檔案上傳到儲存的儲存桶 config
ls -lrt config.properties oci os object put -bn config --file config.properties --force
作業 8:建立簡單的 AVRO 訊息,並使用提供的範例 python 程式碼將訊息儲存至檔案
-
開啟已安裝 OCI CLI、Docker 和 Python3 的 Shell 終端機,然後根據先前上傳至物件儲存中的相同 AVRO 綱要,建立包含單一訊息的新 AVRO 檔案。
注意:您必須檢查 Python 版本,在此教學課程中使用 Python 3.9.16 ,較舊的版本可能無法使用。
-
請從此處取得驗證碼:Create_avro_sample.zip 。
-
將其解壓縮至您選擇的位置,然後執行程式以產生範例 AVRO 訊息:
cd ~ mkdir create_avro_sample cd create_avro_sample unzip CreateAVRO_SampleFile.zip # Check the files are there ls -lrt # install the python dependencies on requirements.txt pip3 install -r requirements.txt # Run the program and create an AVRO message file python3 create_avro_sample.py meu_file.bin '{"id":10029,"name":"John","email":"john@bla.com"}'
作業 9:建立 OCI 函數以接收 AVRO 訊息並發布至 OCI Streaming 主題
-
前往開發人員服務的函數底下,按一下應用程式,然後按一下建立應用程式。
-
請前往已安裝 Docker、OCI CLI、FN CLI 的終端機 Shell,然後執行下列命令來初始化函數。
注意:如果您遵循這些步驟,您的 Docker 登入命令現在已經執行 (如果尚未執行),請繼續進行建立容器登錄作業中的 Docker 登入步驟。
fn create context oci-cloud --provider oracle fn use context oci-cloud fn update context oracle.compartment-id PASTE_YOUR_COMPARTMENT_OCID fn update context api-url https://functions.sa-saopaulo-1.oraclecloud.com fn update context registry gru.ocir.io/PASTE_YOUR_REGISTRY_NAMESPACE
注意:在本教學課程中,如果您使用不同的區域,請使用巴西東部 (聖保羅) 區域,您必須變更 api-url 和 registry 位置。
-
建立簡單的 Hello-world 函數,以確保您的所有設定值都正確。
fn init --runtime python fn-recep-avro cd fn-recep-avro fn deploy --app MyReceptionApp fn invoke MyReceptionApp fn-recep-avro
-
取得 fn-recep-avro.zip 檔案中的 AVRO 函數範例程式碼,然後取代我們先前建立的 hello-world 程式碼。您必須同時取得 func.py 和 requirements.txt 這兩個檔案,才能正常運作。
# Check you have the right code for func.py & requirements.txt (you got from zip file) ls -lrt
-
建置新程式碼並部署函數
fn deploy --app MyReceptionApp
-
若要呼叫函數,我們需要傳送 AVRO 訊息作為參數,因此將使用先前步驟建立的範例 AVRO 訊息檔案。第一次呼叫函數時會花費較長的時間,因為該函數需要啟動。
# Check where you created the sample avro message file ls -lrt ../create_avro_sample/ # Invoke the function to check if it's working as expected echo -n -e "$(cat ../create_avro_sample/meu_file.bin)" | fn invoke MyReceptionApp fn-recep-avro
作業 10:建立 API 閘道以顯示函數
-
在您的主控台中,依序按一下開發人員服務和 API 管理底下的閘道,然後按一下建立閘道。
-
建立之後,請按一下部署選項,然後按一下建立部署。
名稱: RecepFunction
路徑前置碼: /- 在「認證」中,選擇無認證,因為這是簡單的實驗室,而且未實行任何 API 認證。此處的主要目標是展示透過 API 傳送二進位 AVRO 訊息的 HTTPS 呼叫,以及針對此實驗室的目的,我們不會對此簡單實驗室實行任何認證方法。
- 在邁向實際使用環境之前,請務必遵循 API 閘道的安全最佳做法。
- 如需詳細資訊,請參閱保護 API 閘道與資源。
路由 1: 路徑: /
Methos: POST
後端類型: Oracle 函數
應用程式:選取您的函數 -
檢查 API 閘道端點並注意。
-
開啟您的 Linux Shell 終端機,然後呼叫 API 閘道。請使用先前步驟中正確的端點取代 API URL 。
cd create_avro_sample/ ls -lrt curl -X POST -H "Content-Type: application/octet-stream" \ -d "$(echo -n -e "$(cat meu_file.bin)")" \ https://xxxxxxxxxxxxx.apigateway.sa-saopaulo-1.oci.customer-oci.com/
檢查點
作業 11:為 API 類型 A 建置容器映像檔
注意: API 類型 A 和 B 程式碼基本上與只使用不同的標頭訊息相同,以便模擬兩個不同的 API 。
-
從 API type A 取得程式碼,並將它解壓縮到您的 Linux Shell 終端機 api-avro-sample_a.zip 中。
-
取得您先前步驟的容器登錄命名空間,然後依照下方樣式建立應用程式登錄位置。ocir url 會根據您的區域而定,例如 gru.ocir.io 代表 Brasil East (SaoPaulo)
[ocir url]/[ 您的命名空間 ]/api-avro-sample_a:最新
-
在 Linux Shell 終端機中,建置並推送此 API 的 docker 映像檔。
ls -lrt docker build . -t gru.ocir.io/yournamespace/api-avro-sample_a:latest docker push gru.ocir.io/yournamespace/api-avro-sample_a:latest
作業 12:為 API 類型 B 建立容器映像檔
-
從 API type B 取得程式碼,並將它解壓縮到您的 Linux Shell 終端機 api-avro-sample_b.zip 中。
-
取得您先前步驟的容器登錄命名空間,然後依照下方樣式建立應用程式登錄位置。ocir url 會根據您的區域而定,例如 gru.ocir.io 代表 Brasil East (SaoPaulo)
[ocir url]/[ 您的命名空間 ]/api-avro-sample_b:最新
-
在 Linux Shell 終端機中,建置並推送此 API 的 docker 映像檔。
ls -lrt docker build . -t gru.ocir.io/yournamespace/api-avro-sample_b:latest docker push gru.ocir.io/yournamespace/api-avro-sample_b:latest
-
如果已順利推送映像檔,請檢查您的容器登錄頁面。
作業 13:在容器服務中部署 API
-
前往開發人員服務、容器執行處理,然後按一下建立容器執行處理。
-
為 api-type-b 重複步驟 1,並為 TYPE B API 選取正確的影像。
-
前往開發人員服務、容器執行處理,然後按一下建立容器執行處理,然後重複部署 API 類型 B 的步驟
-
從您的容器執行處理取得內部 FQDN 位址。
- 按一下容器執行處理,並記下每個內部 FQDN 位址。
-
前往識別與安全,按一下保存庫,選取您的 VAULT,然後建立兩個新的加密密碼。
加密密碼名稱 值 API_TYPE_A_URL 貼上 API 類型 A 的內部 FQDN 專用位址 API_TYPE_B_URL 貼上 API 類型 B 的內部 FQDN 專用位址 記下每個加密密碼 OCID
您的保存庫現在應該如下所示:
-
編輯上傳到 config 儲存體儲存桶的 config.properties 檔案,並新增加密密碼 OCID 的新項目
ls -lrt config.properties vi config.properties api_type_a_url_vaultOCID=paste_API_TYPE_A_URL_secretOCID api_type_b_url_vaultOCID=paste_API_TYPE_B_URL_secretOCID # After save, please upload the new version to Object Storage cat config.properties oci os object put -bn config --file config.properties --force
檔案應顯示如下:
-
作業 14:使用 create_avro_sample.py 測試 API
-
請前往您的 Linux Shell 終端機,此終端機會從 Task 7 儲存 create_avro_sample.py ,並建立一些用於測試 API 呼叫的新訊息。我們正在建立具有不同 ID 的兩個新的 AVRO 檔案 (1010 a 1020),我們將用來作為 Spark Stream (DataFlow) 程式內的篩選條件。
ls -lrt python3 create_avro_sample.py type_a_message.bin '{"id":1010,"name":"Paul that goes to API type A","email":"paul@bla.com"}' python3 create_avro_sample.py type_b_message.bin '{"id":1020,"name":"Mary that goes to API type B","email":"mary@bla.com"}'
-
呼叫傳送 AVRO 訊息以進行測試的 API 運作正常。前往您的容器執行處理頁面,然後取得每個 API api-type-a 和 api-type-b 的內部 FQDN 位址。請記得從您的 API 取代下方相對應內部 FQDN 位址的 URL。
ls -lrt type* curl -i -X POST -H "Content-Type: application/octet-stream" \ --data-binary "@type_a_message.bin" \ xxx.xx.x.xxx curl -i -X POST -H "Content-Type: application/octet-stream" \ --data-binary "@type_b_message.bin" \ xxx.xxx.xx.xxx
作業 15:設定 Java Spark 串流處理應用程式
-
前往儲存桶,按一下建立儲存桶,然後建立兩個名為 dataflow-app-avro 和 dataflow-logs-avro 的新儲存桶,這將會用來上傳您的 java 應用程式。
-
請再次檢查您的 java 環境版本。
Java
java 11.0.8 2020-07-14 LTS Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS) Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
Maven 版本
Apache Maven 3.5.4 (Red Hat 3.5.4-5) Maven home: /usr/share/maven Java version: 11.0.20, vendor: Red Hat, Inc., runtime: /usr/lib/jvm/java-11-openjdk-11.0.20.0.8-3.0.1.el8.x86_64 Default locale: en_US, platform encoding: ANSI_X3.4-1968 OS name: "linux", version: "5.15.0-103.114.4.el8uek.x86_64", arch: "amd64", family: "unix"
-
下載範例程式碼,並將它解壓縮到您有 oci-cli、docker、java 以及 maven 的本機環境中: spark-consume-avro-message.zip 。
unzip spark-consume-avro-message.zip cd spark-consume-avro-message ls -lrt
深入瞭解代理主機程式碼,呼叫容器執行處理類型 A 和 B 。
檢查主程式檔案 .src/main/java/example/Example.java....
-
因為這個 Java 程式使用程式庫來處理 spark-avro,所以我們需要封裝相依性才能將它傳送到資料流程。因此,如果您需要更多詳細資訊,請使用資料流程相依性封裝器,請前往資料流程相依性封裝器。
套裝軟體 org.apache.spark:spark-avro_2.12:3.2.1 已經在 packages.txt 檔案宣告,您只需要執行以下動作即可封裝它:
docker run --privileged --platform linux/amd64 --rm -v $(pwd):/opt/dataflow --pull always -it phx.ocir.io/oracle/dataflow/dependency-packager:latest -p 3.8
-
使用 oci-cli,將 archive.zip 檔案上傳到名為 dataflow-app-avro 的儲存體儲存桶。
oci os object put -bn dataflow-app-avro --file archive.zip --force
-
編譯、封裝 java 應用程式,然後上傳到儲存的儲存桶 dataflow-app-avro
ls -lrt mvn clean install
... 減少編譯日誌的行數 ...# upload the JAR file to the storage bucket oci os object put -bn dataflow-app-avro --file target/consumekafka-1.0-SNAPSHOT.jar --force
-
檢查您目前的 dataflow-app-avro 儲存儲存的儲存桶,並且確定其外觀。
-
前往分析與 AI ,然後在資料湖底下按一下資料流程,選取左側功能表專用端點,然後按一下建立專用端點。
-
我們使用容器執行處理和 OCI 串流集區的 PRIVATE 子網路,因此需要有專用端點。
-
確定您是從 OCI 容器執行處理和 OCI 串流集區,在內部 FQDN 填入 DNS 區域 (使用逗號區隔)。
-
-
前往分析與 AI ,然後在資料湖底下按一下資料流程,然後按一下建立應用程式。
-
建立之後,請選取 spark-lab-avro 資料流程,然後按一下執行 (Run) 啟動程式,啟動程式通常需要 8 分鐘。
-
-
檢查「執行中」資料流程應用程式,然後開啟將顯示目前工作且應用程式正在運作的 SparkUI。
作業 16:驗證流程
呼叫函數並傳送訊息以檢查所有流程是否如預期般運作。
-
開啟您的 Linux shell 終端機,您可以在該終端機中建立範例訊息 type_a_message.bin 和 type_b_message.bin ,然後傳送訊息。請使用您從 API 閘道建立取得的正確端點取代 API URL 。
cd create_avro_sample/ ls -lrt curl -X POST -H "Content-Type: application/octet-stream" \ -d "$(echo -n -e "$(cat type_a_message.bin)")" \ https://xxxxxxxxxxxxx.apigateway.sa-saopaulo-1.oci.customer-oci.com/
-
檢查容器執行處理上的日誌,檢查是否呼叫 API 類型 A 。
您可以重複此程序並傳送一個 type_b_message.bin 檔案,然後它就會呼叫 B 容器執行處理類型。
相關連結
確認
- 作者 - Joao Tarla (Oracle LAD A-Team 解決方案工程師)
其他學習資源
探索 docs.oracle.com/learn 的其他實驗室,或者存取更多 Oracle Learning YouTube 頻道上的免費學習內容。此外,請瀏覽 education.oracle.com/learning-explorer 以成為 Oracle Learning 檔案總管。
如需產品文件,請造訪 Oracle Help Center 。
Stream AVRO Messages with Oracle Cloud Infrastructure Streaming and OCI Data Flow with Micro-Batch processing
F86501-01
September 2023
Copyright © 2023, Oracle and/or its affiliates.