13 Kinesis Streamsハンドラの使用
Kinesis Streamsハンドラの使用方法について説明します。このハンドラは、Amazonクラウドまたは該当する環境でホストされているアプリケーションにデータをストリーミングします。
トピック:
13.1 概要
Amazon Kinesisは、Amazon Cloudでホストされているメッセージング・システムです。Kinesis streamsを使用して、Amazon S3やAmazon Redshiftなどの他のAmazon Cloudアプリケーションにデータをストリーミングできます。Kinesis Streamsハンドラを使用すると、Amazon Cloudまたはユーザーのサイトでホストされているアプリケーションにデータをストリーミングすることもできます。Amazon Kinesisストリームでは、Apache Kafkaと類似した機能が提供されます。
論理コンセプトのマップは次のとおりです。
-
Kafkaのトピック = Kinesisのストリーム
-
Kafkaのパーティション = Kinesisのシャード
Kinesisのストリームには少なくとも1つのシャードが必要です。
親トピック: Kinesis Streamsハンドラの使用
13.2 詳細な機能
トピック:
13.2.1 Amazon Kinesis Java SDK
Oracle GoldenGate Kinesis Streamsハンドラは、AWS Kinesis Java SDKを使用してAmazon Kinesisにデータをプッシュします。次の場所でAmazon Kinesis Streams Developer Guideを参照してください。
http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html。
Kinesis Steamsハンドラは、最新のAWS Kinesis Java SDKバージョン1.11.107で設計およびテストされました。依存関係は次のとおりです。
-
グループID:
com.amazonaws
-
アーティファクトID:
aws-java-sdk-kinesis
-
バージョン:
1.11.107
Oracle GoldenGate for Big Dataは、AWS Kinesis Java SDKに付属していません。動作保証マトリックスに記載されているAWS Kinesis Java SDKを使用することをお薦めします。動作保証要件とシステム要件の確認を参照してください。
注意:
最新のAWS Kinesis Java SDKへの移行はインタフェースに変更がないことを前提としており、そうでない場合はKinesis Streamsハンドラとの互換性が失われる可能性があります。Kinesisを含め、AWS Java SDKは次のサイトからダウンロードできます。
親トピック: 詳細な機能
13.2.2 Kinesis Streamsの入力制限
1つのシャードを持つKinesisストリームの入力上限は、1秒当たり1000メッセージ、1秒当たりの合計データ・サイズが1MBまでです。ストリームまたはシャードを追加すると、次のように潜在的スループットが増加する可能性があります。
-
2つのシャードを持つ1つのストリーム = 2000メッセージ/秒、合計データ・サイズは2MB/秒
-
それぞれが1つのシャードを持つ3つのストリーム = 3000メッセージ/秒、合計データ・サイズは3MB/秒
Kinesis Streamsハンドラで実現できるスケーリングは、ハンドラの構成方法によって異なります。Kinesisストリームの名前は、実行時にKinesis Streamsハンドラの構成に基づいて解決されます。
シャードは、パーティション・キーのハッシュによって選択されます。Kinesisメッセージのパーティション・キーは、nullまたは空の文字列(""
)にはできません。nullまたは空の文字列のパーティション・キーにより、Kinesisエラーが発生し、Replicatプロセスが異常終了します。
スループットを最大化するには、Kinesis Streamsハンドラの構成で、ストリームとシャードの間でメッセージを均等に配信する必要があります。
親トピック: 詳細な機能
13.3 Kinesis Streamsハンドラの設定および実行
ここでは、Kinesis Streamsハンドラのコンポーネントの構成およびハンドラの実行について説明します。
Kinesis Streamsハンドラを設定するには、次のステップを実行します。
-
https://aws.amazon.com/で、Amazon AWSアカウントを作成します。
-
Amazon AWSにログインします。
-
メイン・ページから「Kinesis」を選択します(「Analytics」サブセクションの下)。
-
Amazon Kinesis Streamsの「Go to Streams」を選択して、Amazon Kinesisストリームおよびストリーム内のシャードを作成します。
-
KinesisにアクセスするためのクライアントIDとシークレットを作成します。
Kinesis Streamsハンドラでは、実行時にKinesisに正常に接続するためにこれらの資格証明を必要とします。
-
クライアントIDとシークレットの作成:
-
「AWS」(右上)で自分の名前を選択し、そのリストで「My Security Credentials」を選択します。
-
アクセス・キーを作成および管理するには、「Access Keys」を選択します。
作成時にクライアントIDとシークレットをメモします。
このクライアントIDとシークレットは、作成時にのみアクセスできます。紛失した場合は、アクセス・キーを削除して再作成する必要があります。
-
トピック:
- Kinesis Streamsハンドラでのクラスパスの設定
- Kinesis Streamsハンドラの構成
- テンプレートを使用したストリーム名とパーティション名の解決
- KinesisハンドラでのクライアントIDとシークレットの構成
- Kinesis Streamsハンドラ用のプロキシ・サーバーの構成
- Kinesis Streamsハンドラでのセキュリティの構成
親トピック: Kinesis Streamsハンドラの使用
13.3.1 Kinesis Streamsハンドラでのクラスパスの設定
Javaアダプタのプロパティ・ファイルでgg.classpath
プロパティを構成して、AWS Kinesis Java SDKのJARを次のように指定する必要があります。
gg.classpath={download_dir}/aws-java-sdk-1.11.107/lib/*:{download_dir}/aws-java-sdk-1.11.107/third-party/lib/*
親トピック: Kinesis Streamsハンドラの設定および実行
13.3.2 Kinesis Streamsハンドラの構成
Kinesis Streamsハンドラの操作は、プロパティ・ファイルを使用して構成します。これらのプロパティは、Javaアダプタ・プロパティ・ファイルにあります(Replicatプロパティ・ファイルにはありません)。
Kinesis Streamsハンドラの選択を有効にするには、まずgg.handler.name.type=kinesis_streams
を指定してハンドラ・タイプを構成してから、次に示す他のKinesis Streamsプロパティを構成する必要があります。
表13-1 Kinesis Streamsハンドラの構成プロパティ
プロパティ | 必須/オプション | 有効な値 | デフォルト | 説明 |
---|---|---|---|---|
gg.handler.name.type |
必須 |
|
なし |
Kinesis Streamsハンドラを選択し、変更データ取得をKinesisにストリーミングします。 |
gg.handler.name.region |
必須 |
KinesisインスタンスをホストしているAmazonのリージョン名。 |
なし |
Amazon AWSリージョン名の設定は必須です。 |
gg.handler.name.proxyServer |
オプション |
プロキシ・サーバーのホスト名。 |
なし |
プロキシ・サーバー経由でAWSへの接続が必要な場合は、プロキシ・サーバーのホスト名を設定します。 |
gg.handler.name.proxyPort |
オプション |
プロキシ・サーバーのポート番号。 |
なし |
プロキシ・サーバー経由でAWSへの接続が必要な場合は、プロキシ・サーバーのポート名を設定します。 |
gg.handler.name.proxyUsername |
オプション |
プロキシ・サーバーのユーザー名(資格証明が必要な場合)。 |
なし |
プロキシ・サーバー経由でAWSへの接続が必要で、プロキシ・サーバーに資格証明が必要な場合は、プロキシ・サーバーのユーザー名を設定します。 |
gg.handler.name.proxyPassword |
オプション |
プロキシ・サーバーのパスワード(資格証明が必要な場合)。 |
なし |
プロキシ・サーバー経由でAWSへの接続が必要で、プロキシ・サーバーに資格証明が必要な場合は、プロキシ・サーバーのパスワードを設定します。 |
gg.handler.name.deferFlushAtTxCommit |
オプション |
|
|
falseに設定すると、Kinesis Streamsハンドラでは、書込み永続性のためにトランザクションのコミット時にKinesisにデータがフラッシュされます。ただし、パフォーマンスのためにトランザクションのコミットを超えてフラッシュを遅延することが望ましい場合があります(「Kinesisハンドラのパフォーマンスに関する考慮事項」を参照してください)。 |
gg.handler.name.deferFlushOpCount |
オプション |
整数 |
なし |
|
gg.handler.name.formatPerOp |
オプション |
|
|
|
gg.handler.name.customMessageGrouper |
オプション |
oracle.goldengate.handler.kinesis.KinesisJsonTxMessageGrouper |
なし |
この構成パラメータでは、カスタムのロジックを使用してKinesisメッセージをグループ化する機能が提供されます。現時点では、配布には1つの実装のみが含まれています。 |
gg.handler.name.streamMappingTemplate |
必須 |
実行時にKinesisメッセージのパーティション・キー(メッセージ・キー)を解決するためのテンプレート文字列の値。 |
なし |
詳細は、「テンプレートを使用したストリーム名とパーティション名の解決」を参照してください。 |
gg.handler.name.partitionMappingTemplate |
必須 |
実行時にKinesisメッセージのパーティション・キー(メッセージ・キー)を解決するためのテンプレート文字列の値。 |
なし |
詳細は、「テンプレートを使用したストリーム名とパーティション名の解決」を参照してください。 |
gg.hander.name.format |
必須 |
サポートされている任意のプラガブル・フォーマッタ。 |
|
操作メッセージのフォーマッタを選択します。Kinesisには、おそらくJSONが最適です。 |
|
オプション |
|
|
Kinesisハンドラのデフォルトでは、Kinesisストリームがまだ存在しない場合は自動的に作成されます。Kinesisストリームの自動作成を無効にする場合は、 |
|
オプション |
正の整数。 |
|
Kinesisストリームには1つ以上のシャードが含まれます。Kinesisハンドラによって作成されるKinesisストリームのシャード数を制御します。複数のシャードにより、Kinesisストリームへの収集のパフォーマンスを向上させることができます。 |
|
オプション |
|
|
セキュリティ・レベルの追加のためにプロキシ・サーバーへのプロキシ・プロトコル接続を設定します。クライアントは最初にプロキシ・サーバーとのSSLハンドシェイクを実行し、次にAmazon AWSとのSSLハンドシェイクを実行します。この機能はバージョン1.11.396のAmazon SDKに追加されたため、このプロパティを使用するには少なくともこのバージョンを使用する必要があります。 |
親トピック: Kinesis Streamsハンドラの設定および実行
13.3.3 テンプレートを使用したストリーム名とパーティション名の解決
Kinesis Streamsハンドラでは、テンプレートの構成値を使用して実行時にストリーム名とパーティション・キーを解決する機能が提供されます。テンプレートを使用すると、静的な値およびキーワードを構成できます。キーワードは、そのときの処理コンテキストに応じてキーワードを動的に置き換えるために使用されます。テンプレートは、次の構成パラメータに適用されます。
gg.handler.name.streamMappingTemplate
gg.handler.name.partitionMappingTemplate
テンプレートのモード
ソース・データベース・トランザクションは、それぞれが挿入、更新および削除である、1つ以上の個別の操作から構成されます。Kinesisハンドラは、操作ごとに1つのメッセージ(挿入、更新、削除)を送信するように構成することも、トランザクション・レベルで操作をメッセージにグループ化するように構成することもできます。テンプレート・キーワードの多くは、個々のソース・データベース操作のコンテキストに基づいてデータを解決します。したがって、トランザクション・レベルでメッセージを送信する場合には、キーワードの多くは機能しません。たとえば、トランザクション・レベルでメッセージを送信する場合、${fullyQualifiedTableName}
は機能しません。${fullyQualifiedTableName}
プロパティは、操作のための修飾されたソース表名に解決されます。トランザクションには、多くのソース表に対する複数の操作を含めることができます。トランザクション・レベルでのメッセージの完全修飾表名の解決は、非決定的であるため、実行時に異常終了します。
テンプレートのキーワード
次の表は、現在サポートされているキーワード・テンプレートをリストしており、トランザクション・レベルのメッセージでそのキーワードがサポートされているかどうかを示す列が含まれています。
キーワード | 説明 | トランザクション・メッセージのサポート |
---|---|---|
|
カタログ、スキーマおよび表名の間にデリミタのピリオド( たとえば、 |
いいえ |
|
カタログ名に解決されます。 |
いいえ |
|
スキーマ名に解決されます |
いいえ |
|
短い表名に解決されます。 |
いいえ |
|
操作の種類(INSERT、UPDATE、DELETEまたはTRUNCATE)に解決されます |
いいえ |
|
アンダースコア( |
いいえ |
|
ソース証跡ファイルのシーケンス番号の後にオフセット(RBA)が続きます。 |
はい |
|
ソース証跡ファイルからの操作タイムスタンプ。 |
はい |
|
""に解決されます。 |
はい |
|
Replicatプロセスの名前に解決されます。調整された配信を使用している場合は、レプリケートのスレッド番号が付加されたReplicatプロセスの名前に解決されます。 |
はい |
|
キーが完全修飾表名である静的な値に解決されます。キーおよび値は、大カッコの内部に次の形式で指定します。
|
いいえ |
|
キーが完全修飾表名であり、値が解決される列名である列の値に解決されます。次に例を示します。
|
いいえ |
または
|
現在のタイムスタンプに解決されます。 例:
|
はい |
|
null文字列に解決されます。 |
はい |
|
カスタム値リゾルバを記述することが可能です。 |
実装によって異なります。 |
テンプレートの例
テンプレートの構成値の例と解決される値を次に示します。
テンプレートの例 | 解決される値 |
---|---|
|
|
|
|
|
|
親トピック: Kinesis Streamsハンドラの設定および実行
13.3.4 KinesisハンドラでのクライアントIDとシークレットの構成
クライアントIDおよびシークレットは、Kinesis StreamsハンドラがAmazon Kinesisとやりとりするために必要な資格証明です。クライアントIDおよびシークレットは、Amazon AWSのWebサイトによって生成されます。これらの資格証明の取得とKinesisサーバーへの提示は、AWS Kinesis Java SDKによってクライアント側で実行されます。AWS Kinesis Java SDKにより、実行時にクライアントIDおよびシークレットを解決できる複数の方法が提供されます。
クライアントIDおよびシークレットを設定できます
-
Javaプロパティとして、Javaアダプタのプロパティ・ファイルに次のように1行で設定します。
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar -Daws.accessKeyId=your_access_key -Daws.secretKey=your_secret_key
-
環境変数として
AWS_ACCESS_KEY_ID
およびAWS_SECRET_ACCESS_KEY
変数を使用します。 -
ローカル・マシン上のE2C環境で。
親トピック: Kinesis Streamsハンドラの設定および実行
13.3.5 Kinesis Streamsハンドラ用のプロキシ・サーバーの構成
Oracle GoldenGateは、プロキシ・サーバーを使用可能にする次のパラメータを使用して、プロキシ・サーバーとともに使用できます。
gg.handler.name.proxyServer=
-
gg.handler.name.proxyPort=80
資格証明および次の構成パラメータを使用して、プロキシ・サーバーへのアクセスを保護できます。
gg.handler.name.proxyUsername=username
gg.handler.name.proxyPassword=password
構成例:
gg.handlerlist=kinesis
gg.handler.kinesis.type=kinesis_streams
gg.handler.kinesis.mode=op
gg.handler.kinesis.format=json
gg.handler.kinesis.region=us-west-2
gg.handler.kinesis.partitionMappingTemplate=TestPartitionName
gg.handler.kinesis.streamMappingTemplate=TestStream
gg.handler.kinesis.deferFlushAtTxCommit=true
gg.handler.kinesis.deferFlushOpCount=1000
gg.handler.kinesis.formatPerOp=true
#gg.handler.kinesis.customMessageGrouper=oracle.goldengate.handler.kinesis.KinesisJsonTxMessageGrouper
gg.handler.kinesis.proxyServer=www-proxy.myhost.com
gg.handler.kinesis.proxyPort=80
親トピック: Kinesis Streamsハンドラの設定および実行
13.3.6 Kinesis Streamsハンドラでのセキュリティの構成
AWS Kinesis Java SDKでは、HTTPSを使用してKinesisと通信します。Kinesis Streamsハンドラは、信頼できる証明書を使用して実行時にクライアントIDおよびシークレットの資格証明を提示することによって認証されます。
Kinesis Streamsハンドラは、相互認証を提供するサーバーを認証するように構成することもできます。これを行うには、Amazon AWSのWebサイトから証明書を生成し、サーバー認証を構成します。Oracle GoldenGate for Big Dataをホストしているマシンでは、トラスト・ストアを生成する必要があります。トラスト・ストアおよびトラスト・ストアのパスワードは、Kinesis StreamsハンドラのJavaアダプタのプロパティ・ファイルで構成する必要があります。
構成例を次に示します。
javawriter.bootoptions=-Xmx512m -Xms32m
-Djava.class.path=ggjava/ggjava.jar
–Djavax.net.ssl.trustStore=path_to_trust_store_file
–Djavax.net.ssl.trustStorePassword=trust_store_password
親トピック: Kinesis Streamsハンドラの設定および実行
13.4 Kinesisハンドラのパフォーマンスに関する考慮事項
トピック:
13.4.1 Kinesis Streamsの入力制限
1つのシャードを持つKinesisストリームへの最大書込み速度は、1秒当たり1000メッセージ、1秒当たり最大1MBのデータまでです。Kinesisストリームを追加したり、ストリームにシャードを追加することで、Kinesisへの入力をスケーリングできます。ストリームとシャードの両方を追加すると、Kinesisの入力容量が直線的に増加するため、Oracle GoldenGate Kinesis Streamsハンドラのパフォーマンスが向上します。
ストリームやシャードを追加すると、次のような潜在的なスループットが直線的に増加します。
-
2つのシャードを持つ1つのストリーム = 2000メッセージ/秒、合計データ・サイズは2MB/秒。
-
それぞれが1つのシャードを持つ3つのストリーム = 3000メッセージ/秒、合計データ・サイズは3MB/秒。
ストリームおよびシャードを十分に活用するには、Oracle GoldenGate Kinesis Streamsハンドラを構成して、ストリームとシャード間でできるだけ均等にメッセージを配信する必要があります。
すべてのデータが静的パーティション・キーを使用して単一のKinesisストリームに送信される場合、Kinesisストリームまたはシャードを追加してもKinesis入力のスケーリングはできません。Kinesisストリームは、選択されたマッピング方法を使用して実行時に解決されます。たとえば、ソース表名をKinesisストリーム名としてマッピングすると、ソース証跡ファイルからの操作が表全体に均等に分散されている場合、Kinesisストリーム間でメッセージを良好に配信できます。シャードは、パーティション・キーのハッシュによって選択されます。パーティション・キーは、選択されたマッピング方法を使用して実行時に解決されます。したがって、シャード全体の良好なメッセージ配信を保証するため、めまぐるしく変更されるパーティション・キーに対するマッピング方法を選択するのが最善です。
親トピック: Kinesisハンドラのパフォーマンスに関する考慮事項
13.4.2 トランザクションのバッチ処理
Oracle GoldenGate Kinesis Streamsハンドラは、メッセージを受信すると、同期HTTPSコールを介してKinesisに送信する前に、受信したメッセージをKinesisストリームによって1つにまとめてバッチ処理します。トランザクションのコミット時、すべての未処理メッセージがKinesisにフラッシュされます。Kinesisへのフラッシュ・コールはパフォーマンスに影響します。したがって、フラッシュ・コールを遅延するとパフォーマンスが大幅に向上します。
フラッシュ・コールを遅延するために推奨される方法は、Replicatの構成でGROUPTRANSOPS
構成を使用することです。GROUPTRANSOPS
は、複数の小規模なトランザクションを1つの大きなトランザクションにグループ化して、大きなトランザクションが完了するまでトランザクション・コミットのコールを遅延します。GROUPTRANSOPS
パラメータはデータベース操作(挿入、更新および削除)をカウントすることによって機能し、その操作数がGROUPTRANSOPS
構成の設定値以上になった場合にのみ、トランザクション・グループをコミットします。ReplicatのデフォルトのGROUPTRANSOPS
設定は1000です。
GROUPTRANSOPS
設定を大きな値に設定すると、Kinesisへの暫定的なフラッシュが必要になることがあります。Kinesisストリームのバッチ・メッセージを送信する個々のコールは、500個の個別メッセージまたは5MBを超えることはできません。保留中のメッセージの数がストリームごとに500メッセージまたは5MBを超える場合、Kinesisハンドラでは暫定的なフラッシュの実行が必要になります。
親トピック: Kinesisハンドラのパフォーマンスに関する考慮事項
13.4.3 トランザクション・コミット時のフラッシュの遅延
メッセージは、デフォルトでトランザクションのコミット時にKinesisにフラッシュされ、書込み永続性が保証されます。ただし、トランザクションのコミットを超えてフラッシュを遅延することが可能です。これは、ユーザーがトランザクションを1つのメッセージング単位として取得しようとしていて、メッセージがグループ化されてトランザクション・レベルでKinesisに送信される場合(つまり、1つのトランザクション = 1つのKinesisメッセージまたは少数のKinesisメッセージに分割された場合)にのみ推奨されます。
これには、GROUPTRANSOPS
レプリケーション・パラメータを1に設定して、ソース証跡ファイルからの複数の小さなトランザクションをより大きな出力トランザクションにグループ化しないようにする必要があります。これは、トランザクションごとに1つまたは少数のメッセージしか送信されず、トランザクションのコミット・コールが呼び出され、それによりKinesisへのフラッシュ・コールがトリガーされるため、パフォーマンスに影響を与える可能性があります。
良好なパフォーマンスを維持するため、Oracle GoldenGate Kinesis Streamsハンドラでは、ユーザーはKinesisのフラッシュ・コールをトランザクションのコミット・コールを超えて遅延できます。Oracle GoldenGateのReplicatプロセスでは、チェックポイントは{GoldenGate Home}/dirchk
ディレクトリの.cpr
ファイルに保持されます。Javaアダプタも、このディレクトリに.cpj
という名前のチェックポイント・ファイルを保持します。Replicatのチェックポイントは、Oracle GoldenGate Kinesisハンドラではメッセージ損失が発生しないことを保証できないチェックポイントを超えるものです。ただし、この操作モードでは、GoldenGate Kinesis Streamsハンドラにより.cpj
ファイルに正しいチェックポイントが保持されます。このモードで実行している場合、.cpj
ファイル内のチェックポイントが.cpr
ファイル内のチェックポイントの前にあれば、再起動時にそのチェックポイントが解析されて、クラッシュしてもメッセージが失われることはありません。
親トピック: Kinesisハンドラのパフォーマンスに関する考慮事項
13.5 トラブルシューティング
トピック:
13.5.1 Javaクラスパス
最も一般的な初期エラーは、必要なすべてのAWS Kinesis Java SDKクライアント・ライブラリを含めるクラスパスが正しくないことで、ログ・ファイルにClassNotFound
例外が作成されます。
Javaアダプタ・ロギングをDEBUG
に設定してトラブルシューティングを行い、プロセスを再実行できます。デバッグ・レベルでは、ロギングにgg.classpath
構成変数からクラスパスに追加されたJARに関する情報が含まれます。
gg.classpath
変数では、構成されたディレクトリ内のすべてのJARを選択するワイルドカードのアスタリスク(*
)文字がサポートされます。たとえば、/usr/kinesis/sdk/*
です(「Kinesis Streamsハンドラの設定および実行」を参照)。
親トピック: トラブルシューティング
13.5.2 Kinesisハンドラの接続に関する問題
オンプレミスで実行していて、Kinesis StreamsハンドラがKinesisに接続できない場合は、プロキシ・サーバーによってパブリック・インターネットへの接続が保護されている可能性があります。プロキシ・サーバーは、企業のプライベート・ネットワークとパブリック・インターネットとの間のゲートウェイとして機能します。ネットワーク管理者に連絡して、プロキシ・サーバーのURLを取得し、「Kinesis Streamsハンドラ用のプロキシ・サーバーの構成」の指示に従ってください。
親トピック: トラブルシューティング
13.5.3 ロギング
Kinesis Streamsハンドラでは、その構成状態がJavaのログ・ファイルに記録されます。
これは、ハンドラの構成値を確認できるため、役に立ちます。構成状態のロギングのサンプルを次に示します。
**** Begin Kinesis Streams Handler - Configuration Summary **** Mode of operation is set to op. The AWS region name is set to [us-west-2]. A proxy server has been set to [www-proxy.us.oracle.com] using port [80]. The Kinesis Streams Handler will flush to Kinesis at transaction commit. Messages from the GoldenGate source trail file will be sent at the operation level. One operation = One Kinesis Message The stream mapping template of [${fullyQualifiedTableName}] resolves to [fully qualified table name]. The partition mapping template of [${primaryKeys}] resolves to [primary keys]. **** End Kinesis Streams Handler - Configuration Summary ****
親トピック: トラブルシューティング