ノート:
- このチュートリアルではOracle Cloudへのアクセスが必要です。無料アカウントにサインアップするには、Oracle Cloud Infrastructure Free Tierの開始を参照してください。
- Oracle Cloud Infrastructure資格証明、テナンシおよびコンパートメントの値の例を使用します。演習を完了する場合は、これらの値をクラウド環境に固有の値に置き換えてください。
Oracle Cloud Infrastructure Data FlowでAutonomous DatabaseおよびKafkaで大きなファイルを処理
イントロダクション
Oracle Cloud Infrastructure (OCI) Data Flowは、Apache Spark TMアプリケーションを実行するためのフルマネージド・サービスです。データ・フローは、大規模なファイル、ストリーミング、データベース操作の処理に使用され、非常に高いスケーラブルな処理で多くのアプリケーションを構築できます。Apache Sparkは、クラスタ化されたマシンをスケーリングして使用し、最小構成でジョブをパラレル化できます。
Apache Sparkを管理対象サービス(データ・フロー)として使用すると、クラウド処理能力を乗算する多くのスケーラブル・サービスを追加できます。このチュートリアルでは、次の方法について説明します。
- オブジェクト・ストレージ: 低コストでスケーラブルなファイル・リポジトリ
- Autonomous Database: クラウドでスケーラブルなデータベースとして
- ストリーミング: スケーラブルなKafka管理対象サービスとして
このチュートリアルでは、大きいファイルの処理、データベースの問合せおよびデータのマージ/結合に使用される最も一般的なアクティビティを参照して、メモリー内の別の表を作成できます。この膨大なデータを、非常に低コストで高パフォーマンスなKafkaキューに書き込むことができます。
目的
- データ・フローを使用して大量のデータを処理する方法について学習します
- スケーラブル・サービスを統合する方法を学習します: ファイル・リポジトリ、データベースおよびキュー
前提条件
-
運用中のOracle Cloudテナント: 1か月あたりUS$ 300.00の無料のOracle Cloudアカウントを作成して、このチュートリアルを試すことができます。無料Oracle Cloudアカウントを作成を参照してください
-
ローカル・マシンにインストールされたOCI CLI (Oracle Cloudコマンドライン・インタフェース): これは、OCI CLIをインストールするためのリンクです。
-
ローカル・マシンにインストールされているApache Sparkアプリケーション。ローカルおよびデータ・フローでの開発方法を理解するには、Oracle Cloud Infrastructure Data Flowアプリケーションのローカルでの開発、クラウドへのデプロイを参照してください。
ノート: これは、Apache Sparkをインストールするための公式ページです。オペレーショナル・システム(Linux/Mac OS/Windows)のタイプごとにApache Sparkをインストールする別の手順があります。
-
Spark Submit CLIがインストールされています。これは、Spark Submit CLIをインストールするためのリンクです。
-
ローカル・マシンにインストールされたMaven。
-
OCIの概念の知識:
- 区分
- IAMポリシー
- テナント
- リソースのOCID
タスク1: Object Storage構造の作成
Object Storageは、デフォルトのファイル・リポジトリとして使用されます。他のタイプのファイル・リポジトリを使用できますが、Object Storageは、パフォーマンスに優れたファイルを操作するためのシンプルで低コストの方法です。このチュートリアルでは、両方のアプリケーションがオブジェクト・ストレージから大きなCSVファイルをロードし、Apache Sparkが高速で大量のデータを処理するスマートさを示します。
-
コンパートメントの作成: コンパートメントは、クラウド・リソースを編成および分離するために重要です。IAMポリシーによってリソースを分離できます。
-
このリンクを使用して、コンパートメントのポリシーを理解および設定できます: コンパートメントの管理
-
1つのコンパートメントを作成して、このチュートリアルの2つのアプリケーションのすべてのリソースをホストします。analyticsという名前のコンパートメントを作成します。
-
Oracle Cloudのメイン・メニューに移動し、「アイデンティティとセキュリティ」、「コンパートメント」を検索します。「コンパートメント」セクションで、「コンパートメントの作成」をクリックし、名前を入力します。
ノート: ユーザーのグループへのアクセス権を付与し、ユーザーを含める必要があります。
-
「コンパートメントの作成」をクリックして、コンパートメントを含めます。
-
-
オブジェクト・ストレージでのバケットの作成: バケットはオブジェクトを格納するための論理コンテナであるため、このデモに使用されるすべてのファイルはこのバケットに格納されます。
-
Oracle Cloudのメイン・メニューに移動し、「ストレージ」および「バケット」を検索します。「バケット」セクションで、前に作成したコンパートメント(分析)を選択します。
-
「バケットの作成」をクリックします。4つのバケットを作成します: apps、data、dataflow-logs、Wallet
-
これらの4つのバケットとともにバケット名情報を入力し、デフォルトの選択で他のパラメータを保守します。
-
バケットごとに、「作成」をクリックします。作成されたバケットを確認できます。
-
ノート:バケットのIAMポリシーを確認します。デモ・アプリケーションでこれらのバケットを使用する場合は、ポリシーを設定する必要があります。オブジェクト・ストレージの概要およびIAMポリシーで、概念および設定を確認できます。
タスク2: Autonomous Databaseの作成
Oracle Cloud Autonomous Databaseは、Oracle Databaseの管理対象サービスです。このチュートリアルでは、アプリケーションはセキュリティ上の理由からWalletを介してデータベースに接続します。
-
Autonomous Databaseのプロビジョニングの説明に従って、Autonomous Databaseをインスタンス化します。
-
Oracle Cloudのメイン・メニューから、「データ・ウェアハウス」オプションを選択し、「Oracle Database」および「Autonomous Data Warehouse」を選択して、コンパートメント分析を選択し、チュートリアルに従ってデータベース・インスタンスを作成します。
-
インスタンスに「Processed Logs」という名前を付け、データベース名として「logs」を選択し、アプリケーションのコードを変更する必要はありません。
-
ADMINパスワードを入力し、Wallet zipファイルをダウンロードします。
-
データベースの作成後、ADMINユーザー・パスワードを設定し、Wallet zipファイルをダウンロードできます。
-
Wallet zipファイル(
Wallet_logs.zip
)を保存し、ADMINパスワードに注釈を付けます。アプリケーション・コードを設定する必要があります。 -
「ストレージ」、「バケット」に移動します。analyticsコンパートメントに変更すると、Walletバケットが表示されます。これをクリックします。
-
Wallet zipファイルをアップロードするには、「アップロード」をクリックしてWallet_logs.zipファイルを添付します。
ノート: Autonomous DatabaseにアクセスするためのIAMポリシーを確認します(Autonomous DatabaseのIAMポリシー)。
タスク3: CSVサンプル・ファイルのアップロード
Apache Sparkのパワーを示すために、アプリケーションは1,000,000行のCSVファイルを読み取ります。このデータは、1つのコマンドラインでAutonomous Data Warehouseデータベースに挿入され、Kafkaストリーミング(Oracle Cloud Streaming)で公開されます。これらのリソースはすべてスケーラブルで、大量のデータに適しています。
-
次の2つのリンクをダウンロードし、データ・バケットにアップロードします:
-
ノート:
- organizations.csvには、ローカルマシン上でアプリケーションをテストするだけの100行しかありません。
- organizations1M.csvには1,000,000行が含まれ、データ・フロー・インスタンスでの実行に使用されます。
-
Oracle Cloudのメイン・メニューから、「ストレージ」および「バケット」に移動します。データ・バケットをクリックし、前のステップの2つのファイルをアップロードします。
-
ADWデータベースへの補助表のアップロード
-
このファイルをダウンロードしてADWデータベースにアップロードします: GDP PER CAPTA COUNTRY.csv
-
Oracle Cloudのメイン・メニューから、「Oracle Database」および「Autonomous Data Warehouse」を選択します。
-
「処理済ログ」インスタンスをクリックして詳細を表示します。
-
「データベース・アクション」をクリックして、データベース・ユーティリティに移動します。
-
ADMINユーザーの資格証明を入力します。
-
「SQL」オプションをクリックして、問合せユーティリティに移動します。
-
「データ・ロード」をクリックします。
-
GDP PER CAPTA COUNTRY.csvファイルをコンソールパネルにドロップして、データをテーブルにインポートします。
-
GDPPERCAPTAという名前の新しい表が正常にインポートされたことを確認できます。
タスク4: ADW ADMINパスワード用のシークレットVaultの作成
セキュリティ上の理由から、ADW ADMINパスワードはVaultに保存されます。Oracle Cloud Vaultはこのパスワードをセキュリティでホストでき、アプリケーション上でOCI認証を使用してアクセスできます。
-
次のドキュメントの説明に従って、ボールトにシークレットを作成します: Vaultへのデータベース管理パスワードの追加
-
アプリケーションでPASSWORD_SECRET_OCIDという名前の変数を作成し、OCIDを入力します。
ノート: OCI VaultのIAMポリシーを確認します(OCI Vault IAMポリシー)。
タスク5: Oracle Cloud StreamingサービスでのKafkaストリーミングの作成
Oracle Cloud Streamingは、マネージド・ストリーミング・サービスなどのKafkaです。Kafka APIおよび共通SDKを使用してアプリケーションを開発できます。このチュートリアルでは、ストリーミングのインスタンスを作成し、大量のデータを公開して消費するために、両方のアプリケーションで実行されるように構成します。
-
Oracle Cloudのメイン・メニューから、「アナリティクスとAI」、「ストリーム」に移動します。
-
コンパートメントをanalyticsに変更します。このデモのすべてのリソースは、このコンパートメントに作成されます。これにより、IAMをより安全で簡単に制御できます。
-
「ストリームの作成」をクリックします。
-
名前をkafka_like (たとえば)と入力し、他のすべてのパラメータをデフォルト値で保守できます。
-
「作成」をクリックしてインスタンスを初期化します。
-
「アクティブ」ステータスを待機します。これでインスタンスを使用できます。
ノート:ストリーミング作成プロセスでは、「デフォルトのストリーム・プールの自動作成」オプションを選択して、デフォルト・プールを自動的に作成できます。
-
DefaultPoolリンクをクリックします。
-
接続設定を表示します。
-
この情報は次のステップで必要になるため、注釈を付けます。
ノート: OCIストリーミングのIAMポリシーを確認します(OCIストリーミングのIAMポリシー)。
タスク6: KafkaにアクセスするためのAUTH TOKENの生成
OCI IAM上のユーザーに関連付けられた認証トークンを使用して、Oracle CloudのOCIストリーミング(Kafka API)およびその他のリソースにアクセスできます。Kafka接続設定では、SASL接続文字列には、前のタスクで説明したパスワードおよびAUTH_TOKEN値というパラメータがあります。OCIストリーミングへのアクセスを有効にするには、OCIコンソールのユーザーに移動し、AUTH TOKENを作成する必要があります。
-
Oracle Cloudのメイン・メニューから、「アイデンティティとセキュリティ」、「ユーザー」に移動します。
ノート: AUTH TOKENを作成する必要があるユーザーは、これまでに作成されたリソースのOCI CLIおよびすべてのIAMポリシー構成で構成されているユーザーであることに注意してください。リソースは次のとおりです。
- Oracle Cloud Autonomous Data Warehouse
- Oracle Cloudストリーミング
- Oracle Object Storage
- Oracle Data Flow
-
ユーザー名をクリックして詳細を表示します。
-
コンソールの左側にある「認証トークン」オプションをクリックし、「トークンの生成」をクリックします。
ノート: トークンはこのステップでのみ生成され、ステップの完了後は表示されません。そのため、値をコピーして保存します。トークン値を失った場合は、認証トークンを再度生成する必要があります。
タスク7: デモ・アプリケーションの設定
このチュートリアルには、必要な情報を設定する2つのデモ・アプリケーションがあります。
-
Java-CSV-DB:このアプリケーションは、csvファイル(organizations1M.csv)の1,000,000行を読み取り、データベース(Oracle Cloud Autonomous Data Warehouse)およびKafkaストリーミング(Oracle Cloud Streaming)と統合するための一般的なシナリオで通常のプロセスを実行します。
このデモでは、CSVデータセットをデータベースの補助表とマージする方法、および3番目のデータセットをメモリーに生成するクロス・タイプの表を示しています。実行後、データセットがADWに挿入され、Kafkaストリーミングに公開されます。
-
JavaConsumeKafka:このアプリケーションは、処理量が多いためにCPUとメモリーを実行するだけで、最初のアプリケーションのいくつかのステップを繰り返します。違いは、最初のアプリケーションはKafkaストリーミングにパブリッシュし、このアプリケーションはストリーミングから読み取ります。
-
次のリンクを使用してアプリケーションをダウンロードします。
-
Oracle Cloudコンソールで、次の詳細を確認します。
-
テナンシ・ネームスペース
-
パスワード・シークレット
-
ストリーミング接続設定
-
認証トークン
-
-
ダウンロードしたzipファイル(
Java-CSV-DB.zip
およびJavaConsumeKafka.zip
)を開きます。/src/main/java/exampleフォルダに移動し、Example.javaコードを見つけます。-
これらは、テナンシ・リソース値で変更する必要がある変数です。
変数名 リソース名 情報タイトル ネームスペース テナンシ・ネームスペース テナント OBJECT_STORAGE_NAMESPACE テナンシ・ネームスペース テナント PASSWORD_SECRET_OCID PASSWORD_SECRET_OCID OCID streamPoolId ストリーミング接続設定 SASL接続文字列のocid1.streampool.oc1.iad.....値 kafkaUsername ストリーミング接続設定 SASL接続文字列内の" " "内のusenameの値 kafkaPassword 認証トークン 値は作成ステップでのみ表示されます。
-
ノート:このチュートリアルで作成されるすべてのリソースは、US-ASHBURN-1リージョンにあります。作業するリージョンをチェックインします。リージョンを変更する場合は、2つのコード・ファイルで次の詳細を変更する必要があります。
Example.java: bootstrapServers変数を変更し、「us-ashburn-1」を新しいリージョンに置き換えます。
OboTokenClientConfigurator.java: 新しいリージョンでCANONICAL_REGION_NAME変数を変更します。
タスク8: Javaコードの理解
このチュートリアルはJavaで作成され、このコードはPythonにも移植できます。チュートリアルは2つの部分に分かれています。
-
Kafka Streamingに公開するアプリケーション1
-
Kafkaストリーミングから消費するアプリケーション2
効率性とスケーラビリティを証明するために、両方のアプリケーションが開発され、統合プロセスの一般的なユース・ケースでいくつかの可能性が示されました。したがって、両方のアプリケーションのコードは、次の例を示しています。
-
1,000,000行のCSVファイルの読取り
-
JDBC接続を介して接続するためのADW Walletの準備
-
1,000,000行のCSVデータをADWデータベースに挿入
-
SQL文を実行してADW表を問い合せます。
-
SQL文を実行して、ADWデータセット表を含むCSVデータセットを結合します
-
CSVデータセットをループして、データとの反復を示す
-
Kafkaストリーミングの操作
このデモはローカル・マシンで実行でき、データ・フロー・インスタンスにデプロイしてジョブ実行として実行できます。
ノート:データ・フロー・ジョブとローカル・マシンの両方で、OCI CLI構成を使用してOCIリソースにアクセスします。データ・フロー側では、すべてが事前構成されているため、パラメータを変更する必要はありません。ローカル・マシン側で、OCI CLIをインストールし、OCIリソースにアクセスするためのテナント、ユーザーおよび秘密キーを構成します。
Example.java
コードをセクションに表示します:
-
Apache Sparkの初期化: コードのこの部分は、Sparkの初期化を表します。実行プロセスを実行するほとんどの構成は自動的に構成されるため、Sparkエンジンでの作業は非常に簡単です。
-
大きなファイルを様々な形式で読み取ります。Apache SparkエンジンとSDKを使用すると、高速なロードおよび書込みファイル形式が可能になります。大容量ボリュームは、秒単位でもミリ秒単位でも操作できます。そのため、メモリー内のMERGE、FILTER、JOINデータセットを操作し、異なるデータ・ソースを操作できます。
-
ADW Vaultシークレットの読取り: コードのこの部分はボールトにアクセスして、ADWインスタンスのシークレットを取得します。
-
JDBCを介して接続するための
Wallet.zip
ファイルの読取り: この項では、オブジェクト・ストレージからWallet.zip
ファイルをロードし、JDBCドライバを構成する方法を示します。 -
CSVデータセットの1,000,000行をADWデータベースに挿入: CSVデータセットから、ADWデータベースに直接バッチ挿入できます。Apache Sparkは、クラスタ化されたマシン、CPUおよびメモリーのすべてのパワーを使用して実行を最適化し、最高のパフォーマンスを得ることができます。
-
データ変換: 多くのCSVファイルのロード、データセット、JOIN、フィルタ、列の削除、いくつかのコード行での計算およびその他の多くの操作をわずかな時間で問い合せ、任意の形式で書込み操作を実行することを想像してください。この例では、CSVデータセットおよびADWデータベース・データセットからoracleDF2という名前の新しいデータセットが作成されました。
-
ループ内のデータセットを使用して反復: CSVデータセット(1,000,000行)に対するループの反復の例です。rowオブジェクトには、CSVフィールド構造のマッピングが含まれています。そのため、各行のデータを取得し、APIコールおよびその他の多くの操作を実行できます。
-
Kafka操作: これは、Kafka APIを使用してOCIストリーミングに接続するための準備です。
ノート: Oracle Cloud Streamingは、ほとんどのKafka APIと互換性があります。
-
接続パラメータを構成した後、コードはストリーミングを生成および消費する方法を示しています。
タスク9: Mavenを使用したアプリケーションのパッケージ化
Apache Sparkでジョブを実行する前に、アプリケーションをMavenでパッケージ化する必要があります。Mavenは、ライブラリおよびプラグインを使用してアプリケーションをパッケージ化する最も既知のユーティリティの1つです。
注意:
CSVファイルを100行のみで変更する高速テストを実行できます。これを行うには、Example.javaファイルで次のコードを見つけます。private static String INPUT_PATH = "oci://data@" + OBJECT_STORAGE_NAMESPACE + "/organizations1M.csv";
organizations1M.csv
をorganizations.csv
に置き換えると、実行が大幅に高速になります。
-
Java-CSV-DBパッケージ
-
/Java-CSV-DBフォルダに移動し、次のコマンドを実行します。
mvn package
-
Mavenがパッケージを開始していることがわかります。
-
すべてが正しい場合は、「成功」メッセージが表示されます。
-
ローカルApache Sparkマシンでアプリケーションをテストするには、次のコマンドを実行します:
spark-submit --class example.Example target/loadadw-1.0-SNAPSHOT.jar
-
-
JavaConsumeKafkaパッケージ
-
/JavaConsumeKafkaフォルダに移動し、次のコマンドを実行します。
mvn package
-
Mavenがパッケージを開始していることがわかります。
-
すべてが正しい場合は、「成功」メッセージが表示されます。
-
ローカルApache Sparkマシンでアプリケーションをテストするには、次のコマンドを実行します:
spark-submit --class example.Example target/loadkafka-1.0-SNAPSHOT.jar
-
タスク10: 実行の確認
-
ADW挿入の確認
-
Oracle Cloudのメイン・メニューに移動し、「Oracle Database」および「Autonomous Data Warehouse」を選択します。
-
「処理済ログ」インスタンスをクリックして詳細を表示します。
-
「データベース・アクション」をクリックして、データベース・ユーティリティに移動します。
-
ADMINユーザーの資格証明を入力します。
-
「SQL」オプションをクリックして、問合せユーティリティに移動します。
-
問合せを実行して、表の1,000,000行を表示します。
-
-
実行ログの確認
-
ジョブがデータセットにアクセスしてロードできる場合は、実行ログで確認できます。
-
タスク11: データ・フロー・ジョブの作成および実行
これで、両方のアプリケーションがローカルApache Sparkマシンで正常に実行され、テナンシのOracle Cloudデータ・フローにデプロイできるようになりました。
-
Oracle Cloudのメイン・メニューから、「アナリティクスとAI」および「データ・フロー」に移動します。
-
データ・フロー・アプリケーションを作成する前に、必ずanalyticsコンパートメントを選択してください。
-
「アプリケーションの作成」をクリックします。
-
次のイメージに示すように、パラメータを完了します。
-
「作成」をクリックします。
-
作成後、「デモのスケーリング」リンクをクリックして詳細を表示します。
-
「実行」をクリックして、ジョブを実行します。
-
パラメータを確認し、「実行」を再度クリックします。
-
ジョブのステータスを表示し、「ステータス」が「成功」に変わり、結果が表示されるまで待ちます。
次のステップ
最初のアプリケーションは、データをKafka Streamingに公開します。2番目のアプリケーションは、Kafkaからこのデータを消費します。
-
最初のデータ・フロー・アプリケーションを作成したときと同じステップを使用して、別のデータ・フロー・アプリケーションを作成します。
-
アプリケーションの名前を変更し、パッケージをloadadw-1.0-SNAPSHOT.jarからloadkafka-1.0-SNAPSHOT.jarに変更する必要があります。
-
他のパラメータは、最初のデータ・フロー・アプリケーションと同じに保ち、ジョブを実行できます。
関連リンク
謝辞
- 著者 - Cristiano Hoshikawa (LAD Aチーム・ソリューション・エンジニア)
その他の学習リソース
docs.oracle.com/learnで他のラボをご覧いただくか、Oracle Learning YouTubeチャネルでより無料のラーニング・コンテンツにアクセスしてください。また、education.oracle.com/learning-explorerにアクセスして、Oracle Learning Explorerになります。
製品ドキュメントについては、Oracle Help Centerを参照してください。
Process large files in Autonomous Database and Kafka with Oracle Cloud Infrastructure Data Flow
F79141-01
March 2023
Copyright © 2023, Oracle and/or its affiliates.