ノート:

Oracle Cloud Infrastructure StreamingとOCIデータ・フローを使用したAVROメッセージのストリーミングとマイクロバッチ処理

イントロダクション

今日のデータ主導の環境において、リアルタイムのデータ・ストリームを処理および分析する機能は、洞察を得て変化する状況に迅速に対応することを目指す企業にとって重要です。ストリーミング・データ処理技術は、大量かつ継続的なデータ・ストリームを処理するための強力なソリューションとして出現しました。このチュートリアルでは、Oracle Cloud Infrastructure (OCI) Streamingを使用して、マイクロバッチ処理技術と組み合わせて、オープンソースのFNプロジェクトに基づいたOracle Functionsのサーバーレス機能で強化されたAVROメッセージを効率的にストリーミングするための革新的なアプローチについて説明します。

AVROおよびストリーミング・データの概要

広く採用されているデータ・シリアライズ形式であるAVROは、複雑なデータ構造および様々なプログラミング言語との互換性を表す効率で知られています。ストリーミング・テクノロジと統合されている場合、AVROを使用すると、組織はほぼリアルタイムでデータを送信および処理できるため、通常はバッチ処理に関連する待機時間なしで貴重なインサイトを抽出できます。

OCIストリーミング: リアルタイム・データの強化

Oracle Cloud Infrastructure(OCI)には、クラウド内のデータを処理するための様々なツールがあります。OCI Streamingは、高スループットのリアルタイム・データ・ストリーム向けに調整されたサービスの1つです。OCIストリーミングを活用することで、開発者は、データストリームを効率的に取り込み、処理、配布するスケーラブルで信頼性の高いデータパイプラインを構築できます。

OCI Data Flowが管理するSpark: ロックインのないソリューション

Oracle Cloud Infrastructure (OCI) Data Flowは、完全に管理されたApache Sparkサービスであり、インフラストラクチャをデプロイまたは管理することなく、非常に大規模なデータセットに対して処理タスクを実行します。

マイクロバッチ処理マイクロバッチ処理では、時間またはサイズを基準として使用して、受信データ・ストリームをコンパクト・バッチに分割します。その後、これらのバッチは小規模なジョブとして処理されます。ストリーム処理でのレコードの定数および個別処理とは異なり、マイクロバッチ処理では、処理前に少しの遅延とストレージが導入され、データの処理をより詳細に制御できます。ビッグ・データ・セットを定期的に処理する従来のバッチ処理とは異なり、マイクロバッチ処理は、ほぼリアルタイムの処理と結果の提供を提供します。

シナリオのロック解除: OCIストリーミング、OCIデータ・フローおよびOracle Functions

このチュートリアルでは、OCIストリーミング、OCIデータフロー管理Sparkストリーミング、およびOracle Functionsの融合について説明します。AVROでエンコードされたメッセージを収集し、OCI Data Flowで管理されたSparkマイクロバッチ処理機能を使用して効率的に処理し、Oracle Functionsでサーバーレス・イベント駆動処理を導入する、エンドツーエンドのストリーミング・データ・パイプラインを設定するプロセスをガイドします。

目標

OCIストリーミングおよびOCIデータフロー管理Sparkマイクロバッチ処理を使用して、AVRO形式を使用した効率的でリアルタイムのデータ処理パイプラインを作成します。

重要: このチュートリアルは、教育および学習の目的でのみ設計されています。学習者が制御された設定で実践的な経験を実験して得るための環境を提供します。この演習で使用するセキュリティ構成および演習は、実際のシナリオには適していない可能性があることに注意してください。

実際のアプリケーションのセキュリティーに関する考慮事項は、多くの場合、はるかに複雑で動的です。そのため、本番環境でここに示した手法または構成を実装する前に、包括的なセキュリティ評価およびレビューを行うことが不可欠です。このレビューでは、システムが組織のセキュリティ・ポリシーおよび標準に準拠していることを確認するために、アクセス制御、暗号化、監視、コンプライアンスなど、セキュリティのすべての側面を網羅する必要があります。

演習環境から実際のデプロイメントに移行する場合、セキュリティは常に最優先事項である必要があります。

プロセス・フロー
T0_1

高レベルのアーキテクチャ
T0_1

前提条件- Oracle Cloud Infrastructure

前提条件- ローカル・マシン環境

タスク1: 動的グループの設定

  1. ドメインに移動し、「動的グループ」をクリックして次のグループを作成します。

    グループ名: MyFunctions

    ALL {resource.type = 'fnfunc', resource.compartment.id = 'pasteYourCompartmentID'}
    

    グループ名: ContainerIntances

    ALL {resource.type='compute-container-instances',  resource.compartment.id = 'pasteYourCompartmentID'}
    

    グループ名: DataFlowDynamicGroup

    ALL {resource.type='dataflowrun', resource.compartment.id = 'pasteYourCompartmentID'}
    

タスク2: ポリシーの作成

タスク3: ストレージ・バケットの作成およびAVROスキーマのアップロード

  1. 「バケット」に移動して「バケットの作成」をクリックし、AVRO-schema-bucketという名前の新しいバケットを作成してAVROスキーマ・ファイルを格納します。

    T3_1

  2. バケットを選択し、ネームスペースをノートにとります。後で必要になります。

    T3_1

  3. ファイルuser.asvcをこの作成されたバケットにアップロードします。

    T3_1

タスク4: プライベートOCIストリーミング・トピックの作成

  1. 「アナリティクスとAI」に移動して「ストリーミング」をクリックし、FrontDoorTopicという新しいストリームを作成します。

    T4_0

  2. 「ストリーム・プール」を選択し、PrivatePoolをクリックしてから、「Kafka接続設定」オプションおよび「TAKEノート」フィールドをクリックします。後で必要になります。

    T4_0

タスク5: AUTH TOKENの作成

ユーザーのAUTH TOKENを作成します。これはKafkaトピックと連携するために必要です

  1. 右上のユーザー・アイコンをクリックし、「ユーザー設定」オプションを選択します。

  2. 「認証トークン」をクリックし、新しいトークンおよびトークンのTAKEノートを生成します。

    T4_1

タスク6: コンテナ・レジストリの作成

  1. 「開発者サービス」メニューに移動し、「コンテナ・レジストリ」をクリックして、次のプライベート・リポジトリを作成します。

    リポジトリ名 タイプ
    api-avro-sample_a プライベート
    api-avro-sample_b プライベート
    fn-recep-avro プライベート
  2. ネームスペースのリポジトリおよびTAKE NOTEを確認します。

    T6_1

  3. OCI CLIおよびDockerがインストールされている端末シェルを開き、レジストリでログインを続行します。REGIONの正しいURLを確認してください。このチュートリアルでは、レジストリURLがgru.ocir.ioであるブラジル東部(サンパウロ)を使用しています。

    docker login gru.ocir.io
    Username: <your container namespace>/youruser
    Password: YOUR_AUTH_TOKEN_CREATED_EARLIER
    

    T6_1

タスク7: OCI Vaultの作成

OCI Vaultを作成し、このチュートリアルの後半で使用される必要な変数を指定します。

  1. 「識別とセキュリティ」に移動して「Vault」をクリックし、「Vaultの作成」をクリックします。

    T7_1new

  2. 新しいボールトを選択し、そのマスター暗号化キーを作成します。

    T7_1new

  3. AUTH_KEYという新しいシークレットを作成し、以前に作成した認証キーを貼り付けます。

    T7_1new

  4. シークレット作成プロセスを繰り返して、次の新しいシークレットを作成します:

    変数名 Value
    KAFKA_BOOTSTRAPSERVER "OCIストリーミング構成からのブーストラップ・サーバー"
    KAFKA_TOPIC “FrontDoorTopic”
    KAFKA_USERNAME 「自分のユーザー名+ OCIストリーミング構成からのstreampool ID」
    AUTH_KEY 「前のステップで作成したAUTHトークン」
  5. 各シークレットに作成されたシークレットOCIDをノートにとり、新しい構成ファイルを作成します。

    • config.propertiesファイルには、アプリケーションからボールト・シークレットOCIDへのマッピング変数が含まれます。アプリケーションはこのファイルを使用して、実行時に収集する必要があるボールト・シークレットを識別します。

    • OCI-CLIにアクセスできるローカル・マシンに新規ファイルを作成します。
      各SecreatのOCIDで置換
      ファイル名: config.properties

      kafka_bootstrapserver_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURS
      kafka_topic_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURSxxxxxx
      kafka_username_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURS
      auth_token_vaultOCID=ocid1.vaultsecret.oc1.REPLACE-WITH-YOURS
      
  6. 「バケット」に移動し、「バケットの作成」をクリックして、config.propertiesファイルを格納するconfigという名前の新しいバケットを作成します。

  7. config.propertiesファイルをストレージ・バケットconfigにアップロードします

    ls -lrt config.properties
    oci os object put -bn config --file config.properties --force
    

    T7_1new

タスク8: 単純なAVROメッセージを作成し、提供されたサンプルpythonコードを使用してファイルに保存する

  1. OCI CLI、DockerおよびPython3がインストールされているシェル・ターミナルを開き、前にオブジェクト・ストレージでアップロードしたのと同じAVROスキーマに基づく単一のメッセージを含む新しいAVROファイルを作成します。

    ノート: Pythonのバージョンを確認する必要があります。このチュートリアルではPython 3.9.16を使用していますが、以前のバージョンは機能しない可能性があります。

  2. Create_avro_sample.zipからコードを取得します。

  3. 選択した場所に解凍し、プログラムを実行してサンプルAVROメッセージを生成します。

    cd ~
    mkdir create_avro_sample
    cd create_avro_sample
    unzip CreateAVRO_SampleFile.zip
    # Check the files are there
    ls -lrt
    # install the python dependencies on requirements.txt
    pip3 install -r requirements.txt
    # Run the program and create an AVRO message file
    python3 create_avro_sample.py meu_file.bin '{"id":10029,"name":"John","email":"john@bla.com"}'
    

    T8_1 T8_1 T8_1

タスク9: AVROメッセージを受信してOCIストリーミングのトピックに公開するOCIファンクションの作成

  1. 「開発者サービス」に移動して「ファンクション」の下の「アプリケーション」をクリックし、「アプリケーションの作成」をクリックします。

    T9_1

  2. Docker、OCI CLI、FN CLIがインストールされているターミナル・シェルに移動し、次のコマンドを実行してファンクションを初期化します。

    ノート: ステップに従った場合、Dockerログイン・コマンドはすでに実行されています。実行されていない場合は、コンテナ・レジストリの作成タスクのDockerログイン・ステップに進みます。

    fn create context oci-cloud --provider oracle
    fn use context oci-cloud
    fn update context oracle.compartment-id PASTE_YOUR_COMPARTMENT_OCID
    fn update context api-url https://functions.sa-saopaulo-1.oraclecloud.com
    fn update context registry gru.ocir.io/PASTE_YOUR_REGISTRY_NAMESPACE
    

    ノート: このチュートリアルでは、ブラジル東部(サンパウロ)リージョンを使用しています。別のリージョンを使用している場合は、api-urlおよびregistryの場所を変更する必要があります。

    T9_1

  3. 単純なHello-world関数を作成して、すべての設定が正しいことを確認します。

    fn init --runtime python fn-recep-avro
    cd fn-recep-avro
    fn deploy --app MyReceptionApp
    fn invoke MyReceptionApp fn-recep-avro
    

    T9_1

  4. ファイル fn-recep-AVRO.zip内のAVRO関数のサンプルコードを取得し、以前に作成したhello-worldコードを置き換えます。動作するためには、両方のファイル func.pyrequirements.txtを取得する必要があります。

    # Check you have the right code for func.py & requirements.txt (you got from zip file)
    ls -lrt
    

    T9_1

  5. 新しいコードの作成とファンクションのデプロイ

    fn deploy --app MyReceptionApp
    

    T9_1

  6. ファンクションを呼び出すには、AVROメッセージをパラメータとして渡す必要があります。そのため、前述のステップで作成したサンプルAVROメッセージ・ファイルを使用します。ファンクションを初めて呼び出すと、起動する必要があるため、少し時間がかかります。

    # Check where you created the sample avro message file
    ls -lrt ../create_avro_sample/
    
    # Invoke the function to check if it's working as expected
    echo -n -e "$(cat ../create_avro_sample/meu_file.bin)" | fn invoke MyReceptionApp fn-recep-avro
    

    T9_1

タスク10: ファンクションを公開するためのAPIゲートウェイの作成

  1. コンソールで「開発者サービス」をクリックし、「API管理」「ゲートウェイ」をクリックし、「ゲートウェイの作成」をクリックします。

    T10_1

  2. 作成したら、「デプロイメント」オプションをクリックし、「デプロイメントの作成」をクリックします。

    名前: RecepFunction
    パス接頭辞: /

    • 「認証」では、これは単純な演習であり、API認証が実装されていないため、「認証なし」を選択します。ここでの主な目的は、APIを介してバイナリAVROメッセージを渡すHTTPSコールを示すことであり、この演習の目的では、この単純な演習の認証方法を実装しません。
    • 実際のライフ環境に前進する前に、APIゲートウェイのセキュリティのベスト・プラクティスに従ってください。
    • 詳細は、APIゲートウェイおよびリソースの保護を参照してください。

    ルート1: パス: /

    メトス: POST
    バックエンド・タイプ: Oracle関数
    アプリケーション:ファンクションを選択します

    T9_1

    T9_1

    T9_1

    T9_1

  3. APIゲートウェイ・エンドポイントを確認してノートにとります。

    T9_1

  4. Linuxシェル・ターミナルを開き、APIゲートウェイをコールします。API URLを、前のステップで取得した正しいエンドポイントに置き換えます

    cd create_avro_sample/
    ls -lrt
    curl -X POST -H "Content-Type: application/octet-stream" \
         -d "$(echo -n -e "$(cat meu_file.bin)")" \
         https://xxxxxxxxxxxxx.apigateway.sa-saopaulo-1.oci.customer-oci.com/
    

    T9_1

チェックポイント

T9_1

タスク11: APIタイプAのコンテナ・イメージの構築

ノート: 2つの異なるAPIをシミュレートするために、APIタイプAおよびBコードは基本的に異なるヘッダー・メッセージでのみ同じです

  1. API type Aからコードを取得し、Linuxシェル端末 api-avro-sample_a.zipで解凍します。

  2. 前のステップで取得したコンテナ・レジストリ・ネームスペースを取得して、次のパターンに従ってアプリケーション・レジストリの場所を作成します。ocir urlは、お使いのリージョン(つまり、Brasil East(SaoPaulo)のgru.ocir.io)に基づきます

    [ocir url]/[ネームスペース]/api-avro-sample_a:latest

  3. Linuxシェル・ターミナルで、このAPI用のdockerイメージを構築およびプッシュします。

    ls -lrt
    docker build . -t gru.ocir.io/yournamespace/api-avro-sample_a:latest
    docker push gru.ocir.io/yournamespace/api-avro-sample_a:latest
    

    T10_1 T10_1

タスク12: APIタイプBのコンテナ・イメージの構築

  1. API type Bからコードを取得し、Linuxシェル端末 api-avro-sample_b.zipで解凍します。

  2. 前のステップで取得したコンテナ・レジストリ・ネームスペースを取得して、次のパターンに従ってアプリケーション・レジストリの場所を作成します。ocir urlは、お使いのリージョン(つまり、Brasil East(SaoPaulo)のgru.ocir.io)に基づきます

    [ocir url]/[ネームスペース]/api-avro-sample_b:latest

  3. Linuxシェル・ターミナルで、このAPI用のdockerイメージを構築およびプッシュします。

    ls -lrt
    docker build . -t gru.ocir.io/yournamespace/api-avro-sample_b:latest
    docker push gru.ocir.io/yournamespace/api-avro-sample_b:latest
    

    T10_1

    T10_1

  4. イメージが正常にプッシュされた場合は、コンテナ・レジストリ・ページを確認してください。

    T10_1

タスク13: コンテナ・サービスでのAPIのデプロイ

  1. 「開発者サービス」「コンテナ・インスタンス」に移動して、「コンテナ・インスタンスの作成」をクリックします。

    T13_1

    T13_1

  2. API-type-bに対してステップ1を繰り返し、TYPE B APIに正しいイメージを選択します。

    1. 「開発者サービス」「コンテナ・インスタンス」に移動して「コンテナ・インスタンスの作成」をクリックし、APIタイプBをデプロイするステップを繰り返します

    2. コンテナ・インスタンスから内部FQDNアドレスを取得します。

      T14_1

      • コンテナ・インスタンスをクリックし、各内部FQDNアドレスをノートにとります。

      T14_1

    3. 「識別と秘密」に移動して「VAULT」をクリックし、VAULTを選択して新しいシークレットを2つ作成します。

      シークレット名 Value
      API_TYPE_A_URL APIタイプAの内部FQDNプライベート・アドレスの貼付け
      API_TYPE_B_URL APIタイプBの内部FQDNプライベート・アドレスの貼付け

      各シークレットOCIDをノートにとります

      ボールトは次のようになります:

      T14_1

    4. configストレージ・バケットにアップロードしたconfig.propertiesファイルを編集し、シークレットOCIDの新しいエントリを追加します

      ls -lrt config.properties
      vi config.properties
      api_type_a_url_vaultOCID=paste_API_TYPE_A_URL_secretOCID
      api_type_b_url_vaultOCID=paste_API_TYPE_B_URL_secretOCID
      
      # After save, please upload the new version to Object Storage
      cat config.properties
      oci os object put -bn config --file config.properties --force
      

      ファイルは次のようになります。
      T14_1

      T14_1

タスク14: create_avro_sample.pyを使用したAPIのテスト

  1. タスク7からcreate_avro_sample.pyを保存したLinuxシェル端末に移動し、APIコールをテストするための新しいメッセージを作成します。Spark Stream (DataFlow)プログラム内でフィルタとして使用する、異なるID (1010 a 1020)の2つの新しいAVROファイルを作成しています。

    ls -lrt
    python3 create_avro_sample.py type_a_message.bin '{"id":1010,"name":"Paul that goes to API type A","email":"paul@bla.com"}'
    
    python3 create_avro_sample.py type_b_message.bin '{"id":1020,"name":"Mary that goes to API type B","email":"mary@bla.com"}'
    
    

    T14_1

  2. AVROメッセージを渡してテストが正常に動作しているAPIをコールします。「コンテナ・インスタンス」ページに移動し、API api-type-aおよびapi-type-bのそれぞれについて内部FQDNアドレスを取得します。APIの対応する内部FQDNアドレスの次のURLを必ず置き換えてください。

    ls -lrt type*
    
    curl -i -X POST -H "Content-Type: application/octet-stream" \
       --data-binary "@type_a_message.bin" \
       xxx.xx.x.xxx
    
    curl -i -X POST -H "Content-Type: application/octet-stream" \
       --data-binary "@type_b_message.bin" \
       xxx.xxx.xx.xxx
    
    

    T14_1

タスク15: Java Sparkストリーミング・アプリケーションの設定

  1. 「バケット」に移動して「バケットの作成」をクリックし、dataflow-app-avroおよびdataflow-logs-avroという2つの新しいバケットを作成します。これは、javaアプリケーションのアップロードに使用されます。

  2. java環境のバージョンを再確認してください。

    Java

    java 11.0.8 2020-07-14 LTS
    Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS)
    Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
    

    Maven

    Apache Maven 3.5.4 (Red Hat 3.5.4-5)
    Maven home: /usr/share/maven
    Java version: 11.0.20, vendor: Red Hat, Inc., runtime: /usr/lib/jvm/java-11-openjdk-11.0.20.0.8-3.0.1.el8.x86_64
    Default locale: en_US, platform encoding: ANSI_X3.4-1968
    OS name: "linux", version: "5.15.0-103.114.4.el8uek.x86_64", arch: "amd64", family: "unix"
    
  3. サンプル・コードをダウンロードし、oci-cli、docker、javaおよびmaven (spark-consume-avro-message.zip)があるローカル環境で解凍します。

    unzip spark-consume-avro-message.zip
    cd spark-consume-avro-message
    ls -lrt
    

    T15_1 T15_1

    コンテナ・インスタンス・タイプAおよびBをコールするためのプロキシ・コードを少し詳しく説明します。

    メイン・プログラム・ファイル.src/main/java/example/Example.javaの確認.... T15_1

  4. このJavaプログラムはlibを使用してspark-avroを処理するため、depedencyをパッケージ化してデータフローに渡す必要があります。そのため、データ・フロー依存性パッケージャを使用します。詳細が必要な場合は、データ・フロー依存性パッケージャに移動できます。

    パッケージ org.apache.spark:spark-avro_2.12:3.2.1はすでに packages.txtファイルで宣言されているため、次を実行してパッケージ化する必要があります。

    docker run --privileged --platform linux/amd64 --rm -v $(pwd):/opt/dataflow  --pull always -it phx.ocir.io/oracle/dataflow/dependency-packager:latest -p 3.8
    

    T15_1 T15_1 T15_1

  5. oci-cliを使用して、archive.zipファイルをdataflow-app-avroというストレージ・バケットにアップロードします。

    oci os object put -bn dataflow-app-avro --file archive.zip --force
    
  6. javaアプリケーションをコンパイル、パッケージ化して、ストレージ・バケットdataflow-app-avroにアップロードします

    ls -lrt
    mvn clean install
    

    T15_1
    ...コンパイル・ログの行数が削減されました... T15_1

    # upload the JAR file to the storage bucket
    oci os object put -bn dataflow-app-avro --file target/consumekafka-1.0-SNAPSHOT.jar --force
    

    T15_1

  7. 現在のdataflow-app-avroストレージ・バケットを確認し、次のようになっていることを確認します。

    T15_1

  8. 「アナリティクスとAI」に移動して、「データ・レイク」「データ・フロー」をクリックし、左側のメニューの「プライベート・エンドポイント」を選択して「プライベート・エンドポイントの作成」をクリックします。

    • プライベート・エンドポイントは、コンテナ・インスタンスおよびOCIストリーミング・プールにPRIVATEサブネットを使用しているため必要です。

    • DNSゾーンにOCIコンテナ・インスタンスからの内部FQDNとOCIストリーミング・プールをカンマ区切りで入力してください。

      T15_1

  9. 「アナリティクスとAI」に移動して、「データ・レイク」「データ・フロー」をクリックし、「アプリケーションの作成」をクリックします。

    T15_1
    T15_1
    T15_1
    T15_1
    T15_1
    T15_1

    • 作成後、spark-lab-avroデータフローを選択し、「実行」をクリックしてプログラムを起動します。通常、起動には最大8分かかります。

      T15_1
      T15_1

  10. 「Running dataflow」アプリケーションを確認し、現在のジョブとアプリケーションが動作していることを示すSparkUIを開きます。

    T15_1

    T15_1

    T15_1

タスク16: フローの検証

関数を呼び出してメッセージを渡し、すべてのフローが期待どおりに動作していることを確認します。

  1. サンプルメッセージ type_a_message.bintype_b_message.binを作成した Linuxシェル端末を開き、メッセージを送信します。API URLを、APIゲートウェイの作成から取得した正しいエンドポイントに置き換えます

    cd create_avro_sample/
    ls -lrt
    curl -X POST -H "Content-Type: application/octet-stream" \
       -d "$(echo -n -e "$(cat type_a_message.bin)")" \
       https://xxxxxxxxxxxxx.apigateway.sa-saopaulo-1.oci.customer-oci.com/
    

    T16_1

  2. コンテナ・インスタンスのログを確認して、APIタイプAがコールされたかどうかを確認します。

    T16_1 T16_1

プロセスを繰り返して type_b_message.binファイルを送信すると、タイプ Bコンテナインスタンスが呼び出されます。

T9_1

謝辞

その他の学習リソース

docs.oracle.com/learnで他のラボをご覧いただくか、Oracle Learning YouTubeチャネルでより無料のラーニング・コンテンツにアクセスしてください。また、education.oracle.com/learning-explorerにアクセスして、Oracle Learning Explorerになります。

製品ドキュメントについては、Oracle Help Centerを参照してください。