ノート:

Apache Spark StreamingでOCIデータ・フローを使用して、スケーラブルでほぼリアルタイムのアプリケーションでKafkaトピックを処理します

イントロダクション

Oracle Cloud Infrastructure (OCI) Data Flowは、Apache Sparkというオープンソース・プロジェクトの管理対象サービスです。基本的に、Sparkでは、大規模処理ファイル、ストリーミングおよびデータベース操作に使用できます。非常に高いスケーラブルな処理でアプリケーションを構築できます。Sparkでは、クラスタ化されたマシンをスケーリングして使用し、最小構成でジョブを解析できます。

Sparkを管理対象サービス(データ・フロー)として使用すると、多くのスケーラブル・サービスを追加して、クラウド処理能力を乗算できます。データ・フローには、Sparkストリーミングを処理する機能があります。

ストリーミング・アプリケーションでは、24時間を超えることが多く、数週間または数か月間続く長期間の継続実行が必要です。予期しない障害が発生した場合、ストリーミング・アプリケーションは、誤った計算結果を生成せずに障害発生時点から再起動する必要があります。データ・フローは、Spark構造化ストリーミング・チェックポイントに依存して、オブジェクト・ストレージ・バケットに格納できる処理済オフセットを記録します。

ノート:データをバッチ戦略として処理する必要がある場合は、次の記事をお読みください: Oracle Cloud Infrastructure Data FlowでのAutonomous DatabaseおよびKafkaでの大規模ファイルの処理

dataflow-use-case.png

このチュートリアルでは、データ・ボリューム・ストリーミングの処理、データベースの問合せおよびデータのマージ/結合に使用される最も一般的なアクティビティを確認して、メモリー内の別の表を形成したり、データをほぼリアルタイムで任意の宛先に送信できます。この大量のデータをデータベースおよびKafkaキューに書き込むことができ、非常に低コストで非常に効果的なパフォーマンスを実現できます。

目的

前提条件

タスク1: Object Storage構造の作成

Object Storageは、デフォルトのファイル・リポジトリとして使用されます。他のタイプのファイル・リポジトリを使用できますが、Object Storageは、パフォーマンスに優れたファイルを操作するためのシンプルで低コストの方法です。このチュートリアルでは、両方のアプリケーションがオブジェクト・ストレージから大きなCSVファイルをロードし、Apache Sparkが高速で大量のデータを処理するスマートさを示します。

  1. コンパートメントの作成: クラウド・リソースを編成および分離するには、コンパートメントが重要です。IAMポリシーによってリソースを分離できます。

    • このリンクを使用して、コンパートメントのポリシーを理解および設定できます: コンパートメントの管理

    • 1つのコンパートメントを作成して、このチュートリアルの2つのアプリケーションのすべてのリソースをホストします。analyticsという名前のコンパートメントを作成します。

    • Oracle Cloudのメイン・メニューに移動し、「アイデンティティとセキュリティ」「コンパートメント」を検索します。「コンパートメント」セクションで、「コンパートメントの作成」をクリックし、名前を入力します。 create-compartment.png

      ノート: ユーザーのグループへのアクセス権を付与し、ユーザーを含める必要があります。

    • 「コンパートメントの作成」をクリックして、コンパートメントを含めます。

  2. オブジェクト・ストレージでのバケットの作成: バケットはオブジェクトを格納するための論理コンテナであるため、このデモに使用されるすべてのファイルはこのバケットに格納されます。

    • Oracle Cloudのメイン・メニューに移動し、「ストレージ」および「バケット」を検索します。「バケット」セクションで、前に作成したコンパートメント(分析)を選択します。

      選択- compartment.png

    • 「バケットの作成」をクリックします。4つのバケットを作成します: apps、data、dataflow-logs、Wallet

      create-bucket.png

    • これらの4つのバケットとともにバケット名情報を入力し、デフォルトの選択で他のパラメータを保守します。

    • バケットごとに、「作成」をクリックします。作成されたバケットを確認できます。

      バケット-dataflow.png

ノート:バケットのIAMポリシーを確認します。デモ・アプリケーションでこれらのバケットを使用する場合は、ポリシーを設定する必要があります。オブジェクト・ストレージの概要およびIAMポリシーで、概念および設定を確認できます。

タスク2: Autonomous Databaseの作成

Oracle Cloud Autonomous Databaseは、Oracle Databaseの管理対象サービスです。このチュートリアルでは、アプリケーションはセキュリティ上の理由からWalletを介してデータベースに接続します。

ノート: Autonomous DatabaseにアクセスするためのIAMポリシーを確認します(Autonomous DatabaseのIAMポリシー)。

タスク3: CSVサンプル・ファイルのアップロード

Apache Sparkのパワーを示すために、アプリケーションは1,000,000行のCSVファイルを読み取ります。このデータは、1つのコマンドラインでAutonomous Data Warehouseデータベースに挿入され、Kafkaストリーミング(Oracle Cloud Streaming)で公開されます。これらのリソースはすべてスケーラブルで、大量のデータに適しています。

GDPPERCAPTAという名前の新しい表が正常にインポートされたことを確認できます。

adw-table-imported.png

タスク4: ADW ADMINパスワード用のシークレットVaultの作成

セキュリティ上の理由から、ADW ADMINパスワードはVaultに保存されます。Oracle Cloud Vaultはこのパスワードをセキュリティでホストでき、アプリケーション上でOCI認証を使用してアクセスできます。

ノート: OCI VaultのIAMポリシーを確認します(OCI Vault IAMポリシー)。

タスク5: Kafkaストリーミングの作成(Oracle Cloud Streaming)

Oracle Cloud Streamingは、マネージド・ストリーミング・サービスなどのKafkaです。Kafka APIおよび共通SDKを使用してアプリケーションを開発できます。このチュートリアルでは、ストリーミングのインスタンスを作成し、大量のデータを公開して消費するために、両方のアプリケーションで実行されるように構成します。

  1. Oracle Cloudのメイン・メニューから、「アナリティクスとAI」「ストリーム」に移動します。

  2. コンパートメントをanalyticsに変更します。このデモのすべてのリソースは、このコンパートメントに作成されます。これにより、IAMをより安全で簡単に制御できます。

  3. 「Create Stream」をクリックします。

    create-stream.png

  4. 名前をkafka_like (たとえば)と入力し、他のすべてのパラメータをデフォルト値で保守できます。

    save-create-stream.png

  5. 「作成」をクリックしてインスタンスを初期化します。

  6. 「アクティブ」ステータスを待機します。これで、インスタンスを使用できます。

    ノート:ストリーミング作成プロセスでは、「デフォルトのストリーム・プールの自動作成」オプションを選択して、デフォルト・プールを自動的に作成できます。

  7. DefaultPoolリンクをクリックします。

    default-pool-option.png

  8. 接続設定を表示します。

    stream-conn-settings.png

    kafka-conn.png

  9. この情報は次のステップで必要になるため、注釈を付けます。

ノート: OCIストリーミングのIAMポリシーを確認します(OCIストリーミングのIAMポリシー)。

タスク6: KafkaにアクセスするためのAUTH TOKENの生成

OCI IAM上のユーザーに関連付けられた認証トークンを使用して、Oracle CloudのOCIストリーミング(Kafka API)およびその他のリソースにアクセスできます。Kafka接続設定では、SASL接続文字列には、前のタスクで説明したパスワードおよびAUTH_TOKEN値というパラメータがあります。OCIストリーミングへのアクセスを有効にするには、OCIコンソールのユーザーに移動し、AUTH TOKENを作成する必要があります。

  1. Oracle Cloudのメイン・メニューから、「アイデンティティとセキュリティ」「ユーザー」に移動します。

    ノート: AUTH TOKENを作成する必要があるユーザーは、これまでに作成されたリソースのOCI CLIおよびすべてのIAMポリシー構成で構成されているユーザーであることに注意してください。リソース:

    • Oracle Cloud Autonomous Data Warehouse
    • Oracle Cloudストリーミング
    • Oracle Object Storage
    • Oracle Data Flow
  2. ユーザー名をクリックして詳細を表示します。

    auth_token_create.png

  3. コンソールの左側にある「認証トークン」オプションをクリックし、「トークンの生成」をクリックします。

    ノート: トークンはこのステップでのみ生成され、ステップの完了後は表示されません。そのため、値をコピーして保存します。トークン値を失った場合は、認証トークンを再度生成する必要があります。

auth_token_1.png auth_token_2.png

タスク7: Demoアプリケーションの設定

このチュートリアルには、必要な情報を設定するデモ・アプリケーションがあります。

  1. 次のリンクを使用してアプリケーションをダウンロードします:

  2. Oracle Cloudコンソールで、次の詳細を確認します。

    • テナント・ネームスペース

      テナンシ-namespace-1.png

      tenancy-namespace-detail.png

    • パスワード・シークレット

      ボールト-adw.png

      vault-adw-detail.png

      シークレット-adw.png

    • ストリーミング接続設定

      kafka-conn.png

    • 認証トークン

      auth_token_create.png

      auth_token_2.png

  3. ダウンロードしたzipファイル(Java-CSV-DB.zipおよびJavaConsumeKafka.zip)を開きます。/src/main/java/exampleフォルダに移動し、Example.javaコードを見つけます。

    コード-variables.png

    これらは、テナンシ・リソース値で変更する必要がある変数です。

    変数名 リソース名 情報タイトル
    bootstrapServers ストリーミング接続設定 Bootstrapサーバー
    streamPoolId ストリーミング接続設定 SASL接続文字列のocid1.streampool.oc1.iad.....値
    kafkaUsername ストリーミング接続設定 SASL接続文字列内の" " "内のusenameの値
    kafkaPassword 認証トークン 値は作成ステップでのみ表示されます。
    OBJECT_STORAGE_NAMESPACE テナント・ネームスペース テナント
    ネームスペース テナント・ネームスペース テナント
    PASSWORD_SECRET_OCID PASSWORD_SECRET_OCID OCID

ノート:このデモ用に作成されたすべてのリソースは、US-ASHBURN-1リージョンにあります。作業するリージョンをチェックインします。リージョンを変更する場合は、2つのコード・ファイルの2つのポイントを変更する必要があります。

タスク8: Javaコードの理解

このチュートリアルはJavaで作成され、このコードはPythonにも移植できます。効率性とスケーラビリティを証明するために、統合プロセスの一般的なユース・ケースでいくつかの可能性を示すアプリケーションを開発しました。したがって、アプリケーションのコードは次の例を示しています。

このデモは、ローカル・マシンで実行し、データ・フロー・インスタンスにデプロイしてジョブ実行として実行できます。

ノート:データ・フロー・ジョブおよびローカル・マシンの場合は、OCI CLI構成を使用してOCIリソースにアクセスします。データ・フロー側では、すべてが事前構成されているため、パラメータを変更する必要はありません。ローカル・マシン側で、OCI CLIをインストールし、OCIリソースにアクセスするためのテナント、ユーザーおよび秘密キーを構成しておく必要があります。

Example.javaコードをセクションに表示します:

タスク9: Mavenを使用したアプリケーションのパッケージ化

Sparkでジョブを実行する前に、Mavenでアプリケーションをパッケージ化する必要があります。

  1. /DataflowSparkStreamDemoフォルダに移動し、次のコマンドを実行します。

    mvn package

  2. Mavenがパッケージを開始していることがわかります。

    maven-package-1a.png

  3. すべてが正しい場合は、「成功」メッセージが表示されます。

    maven-success-1a.png

タスク10: 実行の確認

  1. 次のコマンドを実行して、ローカルSparkマシンでアプリケーションをテストします:

    spark-submit --class example.Example target/consumekafka-1.0-SNAPSHOT.jar

  2. Oracle Cloud Streaming Kafkaインスタンスに移動し、「テスト・メッセージの作成」をクリックして、リアルタイム・アプリケーションをテストするためのデータを生成します。

    テスト-kafka-1.png

  3. このJSONメッセージをKafkaトピックに配置できます。

    {"Organization Id": "1235", "Name": "Teste", "Country": "Luxembourg"}

    テスト-kafka-2.png

  4. 「生成」をクリックするたびに、アプリケーションに1つのメッセージを送信します。アプリケーションの出力ログは次のようになります。

    • これは、kafkaトピックから読み取られたデータです。

      テスト-output-1.png

    • これはADW表からマージされたデータです。

      テスト-output-2.png

タスク11: データ・フロー・ジョブの作成および実行

これで、両方のアプリケーションがローカルSparkマシンで成功して実行され、テナンシのOracle Cloudデータ・フローにデプロイできるようになりました。

ノート: Oracle Object StorageやOracle Streaming (Kafka)などのリソースへのアクセスを構成するには、Sparkストリーミングのドキュメントを参照してください: データ・フローへのアクセスの有効化

  1. オブジェクト・ストレージにパッケージをアップロードします。

    • データ・フロー・アプリケーションを作成する前に、Javaアーティファクト・アプリケーション(***-SNAPSHOT.jarファイル)をappsという名前のオブジェクト・ストレージ・バケットにアップロードする必要があります。
  2. データ・フロー・アプリケーションの作成

    • Oracle Cloudのメイン・メニューを選択し、「分析およびAI」および「データ・フロー」に移動します。データ・フロー・アプリケーションを作成する前に、必ずanalyticsコンパートメントを選択してください。

    • 「アプリケーションの作成」をクリックします。

      create-dataflow-app.png

    • このようなパラメータを入力します。

      データフロー-app.png

    • 「作成」をクリックします。

    • 作成後、「デモのスケーリング」リンクをクリックして詳細を表示します。ジョブを実行するには、「実行」をクリックします。

      ノート: 「拡張オプションの表示」をクリックして、Sparkストリーム実行タイプのOCIセキュリティを有効にします。

      詳細- options.png

  3. 次のオプションをアクティブ化します。

    プリンシパル-execution.png

  4. 「実行」をクリックしてジョブを実行します。

  5. パラメータを確認し、「実行」を再度クリックします。

    dataflow-run-job.png

    • ジョブのステータスを表示できます。

      dataflow-run-status.png

    • 「ステータス」が「成功」になるまで待って、結果を確認できます。

      dataflow-run-success.png

謝辞

その他の学習リソース

docs.oracle.com/learnで他のラボをご覧いただくか、Oracle Learning YouTubeチャネルでより無料のラーニング・コンテンツにアクセスしてください。また、education.oracle.com/learning-explorerにアクセスして、Oracle Learning Explorerになります。

製品ドキュメントについては、Oracle Help Centerを参照してください。