22 Oracle SQL Access to Kafka
Oracle Database 23ai以降、Oracle SQL APIを使用して、Oracle SQLによってKafkaトピックを動的に問い合せることができます。
Oracle SQL Access to Kafkaは、いくつかの重要な方法で、KafkaおよびOCI Streaming ServiceストリームをOracle Database 23aiと統合します。最初に、Oracle Databaseを1つ以上のKafkaトピックに接続できます。データベースが接続された後、Oracle DatabaseでKafkaデータを永続化することなく、Oracle SQLを使用してそのトピックを動的に問い合せることができます。この機能を使用すると、Oracle Databaseで取得されたデータと組み合せてリアルタイム・データを分析できます。また、Oracle SQL Access to Kafkaにより、KafkaトピックをOracle Databaseに損失なく迅速かつスケーラブルにロードできます。DBMS_KAFKA
APIは、このプロセス全体の管理を簡略化します。
- Oracle SQL Access to Kafkaバージョン2について
Oracle SQL Access to Kafka (OSaK
)には、Oracle SQLがKafkaトピックを問い合せることを可能にするOracle Databaseのネイティブ機能が用意されています。 - Oracle SQL Access to Kafkaのグローバル表およびビュー
Oracle SQL Access to KafkaがKafka STREAMING、SEEKINGおよびLOADアプリケーションにアクセスする方法、およびグローバル一時表で使用される一意のORA$
接頭辞について学習します。 - Oracle SQL Access to Kafka問合せの実行方法の理解
Oracle SQL Access to KafkaはKafkaストリーミング・データにアクセスしますが、問合せはOracle Databaseグローバル一時表で実行されるため、いくつかの利点があります。 - Oracle DatabaseへのKafkaデータのストリーミング
Oracle SQL Access to Kafkaを使用すると、標準のSQLセマンティクスを使用してKafkaストリーミング・データをOracle Database表で処理できます。 - タイムスタンプによるKafkaデータ・レコードの問合せ
Oracle SQL Access to Kafkaのシーク可能モードは、Kafkaデータに関連付けられたタイムスタンプに基づいて、Kafkaに格納されている古いデータを問い合せる上で役に立ちます。 - Kafkaデータベース管理者ロールについて
Oracle SQL Access to Kafkaを管理するには、Oracle DatabaseロールOSAK_ADMIN_ROLE
を付与し、管理者ロールおよびKafka管理APIパッケージに必要な管理権限を付与します。 - ユーザーへのKafkaデータベース・アクセスの有効化
アプリケーション・ユーザー・アカウントには、OSAKへのアクセスに必要なDBMS_KAFKA
データベース権限が付与されます。 - Oracle SQL Access to Kafkaでサポートされるデータ形式
Oracle SQL Access to Kafkaでは、区切りテキスト・データ(csvなど)、JSONおよびAvroの3つの形式で表されるKafkaレコードがサポートされます - Kafkaクラスタへのアクセスの構成
保護されたKafkaクラスタまたは保護されていないKafkaクラスタへのアクセスを構成できます - Oracle SQL Access to Kafkaアプリケーションの作成
Apacheクラスタ・データにアクセスするアプリケーションを作成するには、必要なアプリケーションのタイプを作成します。 - Kafkaクラスタ接続のセキュリティ
Oracle SQL Access to Kafkaは、SSL、SASL、Kerberosなどの様々なセキュリティ・メカニズムを使用して、KafkaおよびOracle Streaming Service (OSS)へのアクセスをサポートします。 - セキュアでないKafkaクラスタへのアクセスの構成
セキュアでないKafkaクラスタへのアクセスを構成するには、OSAK管理者(osak_admin_roleを持つOracle Databaseユーザー)がこの手順を完了する必要があります。 - セキュアなKafkaクラスタへのアクセスの構成
セキュアなKafkaクラスタへのアクセスを構成するには、この手順を使用します。 - Oracle SQL Access to Kafkaクラスタの管理
Oracle SQL Access to Kafkaを使用してKafkaクラスタ定義を更新、一時的に無効化および削除する方法を確認してください - Oracle SQL Access to KafkaでのKafkaデータの使用に関するガイドライン
アプリケーション開発計画の一部としてガイドライン、制限および推奨事項を確認します。 - アプリケーション用のKafkaクラスタ・アクセス・モードの選択
Oracle SQL Access to Kafkaを使用するには、アプリケーションに必要なデータ・アクセスのモードを決定します。 - Oracle SQL Access to Kafkaアプリケーションの作成
LOADアプリケーションでKafkaデータを問い合せるには、次の手順を使用してKafkaデータをOracle Database表にロードします。 - アプリケーションでのKafkaクラスタ・アクセスの使用
アプリケーションでのKafkaクラスタ・データ・アクセスの使用方法について学習します。
親トピック: その他のユーティリティ
22.1 Oracle SQL Access to Kafkaバージョン2について
Oracle SQL Access to Kafka (OSaK
)には、Oracle SQLからKafkaトピックへの問合せを実行できるOracle Databaseのネイティブ機能が用意されています。
Oracle Database 23ai以降では、Oracle SQL Access to Kafkaのバージョン2がOracle Databaseとともにインストールされます。これは、KafkaクラスタにネイティブのOracle Databaseコネクタ・サービスを提供します。これは、DBMS_KAFKA
およびDBMS_KAFKA_ADM
パッケージを介してアクセスされる一連の機能で構成されています。
機能
Oracle SQL Access to Kafkaバージョン2では、標準のOracle Database SQLセマンティクスを使用してKafkaストリーミング・データをOracle Database表とともに処理でき、標準のOracleアプリケーション・ロジック(Oracle JDBCアプリケーションなど)によってデータを処理できます。Oracle SQL Access to Kafkaは、Oracle Databaseに統合されています。このOracle DatabaseでのKafkaアクセスの統合により、外部クライアント・コネクタ・アプリケーションを必要とせずに、KafkaまたはOCI Streaming Serviceによって生成されたデータ・ストリームを使用して、Oracle Database内の表を関連付けることができます。Oracle SQL Access to Kafkaは、Kafkaアプリケーションと同じ方法でOracle Databaseのデータ・ストリームをスケールアップできます。
Oracle SQL Access to Kafkaを使用すると、次を実行できます:
- ストリーミング・アプリケーションを作成して使用し、未読のKafkaレコードを1回処理します。これらのレコードは、処理後に保持する必要がありません。
- ロード・アプリケーションを作成して使用し、様々なOracle Applicationsによるアクセスのために、未読のKafkaレコードをOracle Database表に永続的に取得します。この場合、Kafkaレコードは取得され、Oracle Databaseのユーザー表に保持されます。このユースケースは、データ・ウェアハウスに役立ちます。
- ユーザーが指定したタイムスタンプ間隔に基づいて、シーク・アプリケーションを作成して使用し、Kafkaトピック内のレコードを再度読み取ります。
- 2つ以上のストリーミング・アプリケーションを作成して使用します。これらのアプリケーションは、2つ以上のKafkaトピックからデータをストリーミングするために使用でき、そこでOracle DatabaseのSQLを使用してそれらを結合できます。
仕組み
Oracle SQL Access to Kafkaバージョン2では、Oracle Databaseシステム生成ビューおよび外部表を使用してKafkaデータにアクセスできます。これらのビューおよび外部表は、DBMS_KAFKAパッケージを使用して、Kafkaアプリケーションへの名前付きOracle SQLアクセスを定義します。通常、これらのビューおよび外部表は、アプリケーションのストリーミング、ロードおよびシークに対して透過的です。
アプリケーションは、操作をOracle Databaseトランザクションとして実行および制御し、データベースのACID (原子性、一貫性、独立性、耐久性)要件に準拠して、トランザクションごとに一意の識別子(トランザクションID)を使用してトランザクションのすべての部分がコミットされるか、すべてがロールバックされるようにします。このトランザクションIDには、エラーの識別およびロールバックに使用できるタイムスタンプが含まれます。Oracle DatabaseトランザクションのACID機能は、障害が発生した場合にレコードを失ったり繰り返したりすることなく、データ・リカバリをサポートします。
Oracle SQL Access to Kafkaで実行されるOracleトランザクションには、Kafkaパーティション・オフセットの管理、およびOracle Databaseのデータベース・メタデータ表へのコミットが含まれます。
Oracle SQL Access to Kafkaがない場合、Kafkaパーティション・オフセットは、アプリケーションまたはKafkaのいずれかで管理する必要がありますが、いずれもトランザクション・セマンティクスをサポートしていません。つまり、システム障害の後、Kafkaレコードが失われるか、アプリケーションによって再処理される可能性があります。Oracle Databaseトランザクションでオフセットを管理すると、これらの問題が回避され、Kafkaデータの独立性と耐久性が向上します。
Oracle SQL Access to KafkaはOracle Databaseで使用でき、PL/SQLおよびSQL問合せとともに使用されるため、Oracle Databaseへのコネクタを提供するために外部クライアント・アプリケーションは必要ありません。
ORA_KAFKA
PL/SQLパッケージには、データベース・スキーマへのKafkaクラスタの登録、Kafkaトピックの問合せ、指定されたオフセットまたは指定されたタイムスタンプからのデータの問合せなどを行う関数およびプロシージャがあります。データを格納せずにグローバル一時表を使用するか、ターゲットOracle Databaseのユーザー表にデータを格納するかを選択できます。
使用方法
Oracle SQL Access to Kafkaアプリケーションを使用して、Oracle Databaseで作成されたグローバル一時表またはユーザー表にアクセスし、アプリケーションからデータを取得できます。そのデータは、データのストリーム、または他のデータベースからのデータのスナップショットである場合があり、直接アクセスすることも、Oracle Database表にロードしてアプリケーション内で使用することもできます。
Kafkaグローバル一時表には次の特性があります:
- グローバル一時表は、アプリケーション・インスタンスの初期に1回ロードされ、アプリケーション・インスタンスの期間中、Kafkaレコードのスナップショットとして使用されます。アプリケーションでは、グローバル一時表とともに標準のOracle SQLを使用できます。
- グローバル一時表からの各問合せの結果、Kafkaクラスタにトリップされ、同じ行および場合によっては追加行が再取得されます。
対応するグローバル一時表は、Oracle SQL Access to Kafkaビューからのスナップショットを受信します。アプリケーションは、トランザクション内の1つ以上のクエリーにこの一時表を使用します。つまり、グローバル一時表は一度ロードされ、使用されます。Kafkaオフセットが拡張され、アプリケーションがコミットされます。これは、グローバル一時表にロードされたKafkaレコードを使用して終了されることを示します。
一時表からの読取りは、次の理由のために有益です:
- 繰返し可能な読取りは、複数の問合せから明示的に、または結合内で暗黙的にサポートされます
- 問合せオプティマイザの信頼できる統計が収集されます
- 一時表のロード時にKafkaに対して行われるトリップは1回のみです。後続の問合せでは、Kafkaクラスタへのトリップは行われません。
- グローバル一時表は、標準のOracle表と結合できます。Oracle SQL Access to Kafka一時表をOracle Database表に結合すると、KafkaデータでOracle Database機能を使用する機能が向上します。
- Oracle Databaseの成熟した最適化方針および処理方針を活用して、表の効率的な結合に必要なコード・パスを最小化できます。
親トピック: Oracle SQL Access to Kafka
22.2 Oracle SQL Access to Kafkaのグローバル表およびビュー
Oracle SQL Access to KafkaがKafka STREAMING、SEEKINGおよびLOADアプリケーションにアクセスする方法、およびグローバル一時表で使用される一意のORA$
接頭辞について学習します。
KafkaトピックのSTREAMING
およびSEEKING
にOracle SQL Access to Kafka (OSAK)を使用するアプリケーションは、PL/SQLを使用してOSAKプロシージャをコールし、対応するOracle SQL Access to Kafkaビューからの問合せの結果を使用してグローバル一時表をロードします。LOAD
アプリケーションは、EXECUTE_LOAD_APP
プロシージャを使用して既存のOracle Database表に増分ロードを実行するため、LOADアプリケーションではグローバル一時表は必要ありません。STREAMING、SEEKINGおよびLOADアプリケーションの場合、OSAKは3つのすべての場合でビューおよび外部表を作成します。
Oracle SQL Access to Kafkaビューと一時表の両方に、Oracle SQL Access to Kafkaによって作成されたオブジェクトとしてこれらを識別する一意のORA$
接頭辞があります。
ORA$DKV
(ビューの場合)およびORA$DKX
(表の場合)は、Oracle SQL Access to Kafkaによって生成されたビューおよび外部表の接頭辞で、Kafkaからユーザー所有の表またはグローバル一時表にデータをロードするためにDBMS_KAFKA
をコールします。通常、これらのビューおよび外部表は内部オブジェクトとして扱われ、Oracleアプリケーションによって直接操作されることはありません。
ORA$DKVGTT
は、ストリーミングまたはシーク・アプリケーションからロードされるグローバル一時表であることを示す接頭辞です。このグローバル一時表は、DBMS_KAFKA.LOAD_TEMP_TABLE
のコール時に透過的にロードされます。
親トピック: Oracle SQL Access to Kafka
22.3 Oracle SQL Access to Kafka問合せの実行方法の理解
Oracle SQL Access to KafkaはKafkaストリーミング・データにアクセスしますが、問合せはOracle Databaseグローバル一時表で実行されるため、いくつかの利点があります。
一般的なアプリケーションは、Oracle SQL Access to Kafkaビューを直接問い合せません。かわりに、次のようにします:
- Oracle SQL Access to Kafkaビューからの各問合せでは、現在のオフセットから現在の高水位標までKafkaから直接データをフェッチします。行は継続的に追加されるため、ビューからの各問合せにより、より多くの行が取得される可能性が高くなります。したがって、Oracle SQL Access to Kafkaビューでは、複数の問合せから明示的に、または結合内で暗黙的に、繰り返し可能な読取りはサポートされていません。
- 問合せオプティマイザのためにOracle SQL Access to Kafkaビューから収集された信頼できる統計はありません
- Oracle SQL Access to Kafka表からの各問合せの結果、Kafkaクラスタにトリップされ、同じ行および場合によっては追加行が再取得されます。これらの問合せ取得はパフォーマンスに影響する可能性があります。
対応する一時表は、Oracle SQL Access to Kafkaビューからのスナップショットを受信します。アプリケーションは、トランザクション内の1つ以上の問合せにこの一時表を使用します。一時表からの読取りは、次の理由のために有益です:
- 繰返し可能な読取りは、複数の問合せから明示的に、または結合内で暗黙的にサポートされます
- 問合せオプティマイザの信頼できる統計が収集されます
- 一時表のロード時にKafkaに対して行われる読取りは1回のみです。後続の問合せでは、データにアクセスするためにKafkaクラスタに戻る必要はありません。
グローバル一時表は、標準のOracle表(Oracle Customer Relationship Management (CRM)表など)と結合できます。
Oracle SQL Access to Kafka一時表をOracle Database表に結合すると、次の利点が得られます:
- Oracle Databaseの成熟した最適化方針および実行方針を活用して、表の効率的な結合に必要なコード・パスを最小化する
- Oracle Database ACIDトランザクション処理(原子性、一貫性、独立性および耐久性)のセキュリティを備えたOracle Databaseトランザクション・セマンティクスを取得し、データに対するすべての変更がアプリケーションによって制御された単一の操作であるかのように実行されるようにする
- Kafkaパーティション・オフセットを管理し、Oracle Databaseのデータベース・メタデータ表にコミットすることで、システム障害が発生した後、Kafkaレコードを含むこれらのOracle Databaseトランザクションが失われたりアプリケーションによって再処理されないようにする。
親トピック: Oracle SQL Access to Kafka
22.4 Oracle DatabaseへのKafkaデータのストリーミング
Oracle SQL Access to Kafkaを使用すると、標準のSQLセマンティクスを使用してKafkaストリーミング・データをOracle Database表で処理できます。
Apache Kafkaは、多くのストリーミング・ソースからデータを取得および統合し、このデータに対して分析を実行できるようにするために一般的に使用されます。通常、これを行うには、すべてのKafkaレコードをデータベースにロードし、このデータを分析のためにデータベース表と組み合せる必要があります(短期調査または長期分析用)。
Oracle SQL Access to Kafkaでは、標準のSQL、PL/SQLおよびその他のデータベース開発ツールを使用して、KafkaからOracle Databaseへのロードを実現し、JDBCアプリケーションなどの標準のOracleアプリケーション・ロジックを使用してこのデータを処理できます。Oracle SQL Access to Kafkaでは、ロードするKafkaトピックのすべてのパーティションにマップするビューを作成できます。より多くのデータをロードするためのOracle SQL Access to Kafkaの各コールにより、このビューが問い合せられ、その結果、最後に読み取られた前の時点から現在のデータの高水位標オフセット(すべてのKafkaパーティションに完全に挿入された最後のメッセージのオフセット)までのKafkaトピックのすべてのパーティションが問い合せられます。Kafkaパーティションから取得されたデータは、一時的なOracle Database表にロードされます。
これらのOracle SQL Access to Kafkaビューは、Kafkaアプリケーション・インスタンスのように動作します。高水位標オフセットに達するまで、特定のオフセットから始まるKafkaからレコードを読み取ります
Oracle SQL Access to Kafkaでビューが作成されると、対応するグローバル一時表も作成されます。アプリケーションにより、Oracle SQL Access to Kafka PL/SQLプロシージャがコールされ、このグローバル一時表が、対応するOracle SQL Access to Kafkaビューからの問合せの結果とともにロードされます。
グローバル一時表は、標準のOracle表(Oracle Customer Relationship Management (CRM)表など)と結合できます。
Oracle SQL Access to Kafka一時表をOracle Database表に結合すると、次の利点が得られます:
- Oracle Databaseの成熟した最適化方針および実行方針を活用して、表の効率的な結合に必要なコード・パスを最小化する
- Oracle Database ACIDトランザクション処理(原子性、一貫性、独立性および耐久性)のセキュリティを備えたOracle Databaseトランザクション・セマンティクスを取得し、データに対するすべての変更がアプリケーションによって制御された単一の操作であるかのように実行されるようにする
- Kafkaパーティション・オフセットを管理し、Oracle Databaseのデータベース・メタデータ表にコミットすることで、システム障害が発生した後、Kafkaレコードを含むこれらのOracle Databaseトランザクションが失われたりアプリケーションによって再処理されないようにする。
親トピック: Oracle SQL Access to Kafka
22.5 タイムスタンプによるKafkaデータ・レコードの問合せ
Oracle SQL Access to Kafkaのシーク可能モードは、Kafkaデータに関連付けられたタイムスタンプに基づいて、Kafkaに格納されている古いデータを問い合せる上で役に立ちます。
異常が発生した場合は、Oracle SQL Access to Kafkaを使用して、指定した時間範囲内の異常に関連するKafkaデータの識別を支援できます。
たとえば、コンピュータ会社に複数のサイトがあるとします。各サイトにはラボがあり、建物およびラボへのアクセスはすべてキー・カード・アクセスによって保護されています。同社には膨大な数の従業員がおり、オフィス・スペースを必要とする人もいれば、ラボでマシンを維持している人もいれば、換気の問題、許可されていないアクセス、サイトの一般的な使用などの問題について建物を監視している人もいます。このシナリオでは、Kafkaトピックは次のもので構成されている場合があります:
- キー・カードの使用(
KCdata
) - 施設の監視(
Fdata
) - 稼働時間、アクセス、侵入検出などのシステム監視(
Sdata
)
Kafkaデータを読み取ってOracleデータと組み合せる間に通常のイベントが検出された場合、アプリケーションは、異常なイベントを含むレコードのタイムスタンプとともに異常をログに記録できます。次に、2つ目のアプリケーションが、これらのエラーを読み取って処理できます。アプリケーションは、異常なイベントごとに、イベントの前後10秒のタイムスタンプの範囲をシークする場合があります。これは、ログ・ファイル内の例外の分析に似ています。イベントの前後にログ・エントリを参照して、例外が以前の問題によって引き起こされたかどうか、または例外によってダウンストリームの問題が発生したかどうかを確認することが一般的です。
サイトの問題を評価するには、キー・カード・リーダー・データ(KCdata
)を永続表にロードできます。たとえば、複数のアプリケーションでこのデータを使用する場合は、複数のアプリケーションで使用できるOracle Database表にその日付をロードして、不動産チームが建物とオフィスの使用状況を追跡できるようにすることが道理にかなっています。IT部門は、このデータを使用して、問題を処理するためにサイトにいるユーザーを判別します。
ストリーミング問合せを使用すると、施設データ(Fdata
)をスキャンして、データに非定型または異常なイベントがあるかどうかを判断できます。これには、研究室温の急上昇、閉まらなかったためにアラームを発生させているドア、アラームを鳴らしている火災検知システム、または時間枠に関連付けられたその他のデータ・ポイント(たとえば、半開きにしたままのドア)などがあります。
セキュリティ・チームには、閉じられなかったドアに関するアラートが表示されます。このチームは、ストリーミング・データを使用して、午前3時17分にドアが半開きのままにされたことを確認します。その後、Seekingクエリーを使用して、30分間のウィンドウ(3:02から3:32)で他の複数のデータ・ポイント(KCdata、Fdata、Sdata)を探し、建物にアクセスしたユーザー、どのドアまたはラボにアクセスしたか、オフラインになったマシンまたは直接アクセスされたマシン、およびその他のデータ・レコードを決定できるため、開発者の状況に適切な対応ができます。
このシナリオでは、Oracle SQL Access to Kafkaを使用して、Kafkaトピックのすべてのパーティションにマップする単一のビューを作成できます。Oracle SQL Access to Kafkaでビューが作成されると、対応するグローバル一時表も作成されます。アプリケーションは、最初に開始および終了のタイムスタンプを指定してから、Oracle SQL Access to Kafkaをコールして、指定した時間範囲内の行を含むグローバル一時表をロードします。標準のOracle Database SQLトランザクション処理を活用して、大量のデータを解析し、異常なイベント内の関連するデバイス・データを識別できます。
親トピック: Oracle SQL Access to Kafka
22.6 Kafkaデータベース管理者ロールについて
Oracle SQL Access to Kafkaを管理するには、Oracle DatabaseロールOSAK_ADMIN_ROLE
を付与し、管理者ロールおよびKafka管理APIパッケージに必要な管理権限を付与します。
Oracleは、Oracle SQL access for Kafka管理権限を管理ユーザーに付与するロールベースの認証を提供するために、Oracle Database 23ai以降でOSAK_ADMIN_ROLE
を提供します。このロールは、Oracle SQL Access to Kafkaの管理者ユーザーに付与できます。このロールは、Kafkaクラスタを構成、登録および管理するためにOracle SQL access for Kafka管理者として指定するユーザーに必要なシステム権限を付与します。このロールによって付与されるシステム権限は次のとおりです:
- CREATE CREDENTIAL: Kafka SASL-SSL (Simple Authentication and Security Layer)パスワードまたはOSS (Oracle Streaming Service) authTokenを作成します
- CREATE ANY DIRECTORY: クラスタ・アクセスおよびクラスタ構成ディレクトリを作成します
- DROP ANY DIRECTORY: クラスタ・アクセスおよびクラスタ構成ディレクトリを削除します
sys.dbms_kafka_clusters
へのREAD権限sys.dbms_kafka_applications
へのREAD権限sys.dbms_kafka_messages
へのREAD権限
親トピック: Oracle SQL Access to Kafka
22.7 ユーザーへのKafkaデータベース・アクセスの有効化
アプリケーション・ユーザー・アカウントには、OSAKへのアクセスに必要なDBMS_KAFKA
データベース権限が付与されます。
DBAとして、Oracle SQL access for Kafkaを管理および使用する権限を作成してユーザーに付与します。ユーザーは、次の2つのカテゴリがあります:
-
Oracle SQL Access to Kafka管理者は特権ユーザーです。Oracle SQL Access to Kafkaの管理を簡略化するために、Oracle DBAが指定されたKafka管理者に
OSAK_ADMIN_ROLE
を付与することをお薦めします。このロールは、Oracle Database 23ai以降のデータベースに事前作成されます。管理者は、
DBMS_KAFKA_ADM
パッケージ・メソッドを実行して、Kafkaクラスタ情報を構成および管理します。OSAK_ADMIN_ROLE
またはOracle DBAのいずれかを付与されたユーザーは、オペレーティング・システム・レベルのクラスタ構成ディレクトリを作成し、そのディレクトリに構成ファイルを移入できます。Oracle SQL Access to Kafka管理者は、Kafkaクラスタ構成およびアクセス・ディレクトリのOracleディレクトリ・オブジェクトを作成します。 -
Kafkaトピック・データのアプリケーション・ユーザーには、Kafkaクラスタ・トピックからアクセスするデータにアクセスして使用できるように、
DBMS_KAFKA
パッケージへのアクセスに必要なREAD
権限が付与されます。
例22-1 Kafka管理者ユーザーへのOSAK_ADMIN_ROLEの付与
この例では、OSAK_ADMIN_ROLE
がユーザーkafka-admin
に付与されます:
GRANT OSAK_ADMIN_ROLE
TO kafka-admin;
例22-2 Kafkaユーザーへのユーザー・アクセス権の付与
アプリケーションがOracle SQL Access to Kafkaを使用できるようにするには、DBMS_KAFKA
アクセス権を付与します。これらのアプリケーション・ユーザーは、ソースKafkaクラスタおよびターゲットOracle Databaseに対する次の権限をすでに持っている必要があります:
CREATE SESSION
CREATE TABLE
CREATE VIEW
- Kafkaデータにアクセスする表領域で使用可能な割当て制限
- 登録済のKafkaクラスタのクラスタ・アクセス・ディレクトリに対する読取りアクセス
親トピック: Oracle SQL Access to Kafka
22.8 Oracle SQL Access to Kafkaでサポートされるデータ形式
Oracle SQL Access to Kafkaでは、区切りテキスト・データ(csvなど)、JSON、Avroの3つの形式で表されたKafkaレコードがサポートされます
Kafkaはスキーマを持たず、形式に依存しません。アプリケーション・データは、Kafkaレコードのキー・フィールドおよび値フィールドに不透明なバイト配列として格納されます。Kafkaキーは主にKafkaパーティションへのデータのハッシュに使用されるため、Kafkaレコードの値フィールドのみが取得され、Oracle行としてレンダリングされます。アプリケーションは、データのシリアライズとデシリアライズ、およびデータ形式の構造を定義するスキーマの提供を担当します。Oracle SQL Access for Kafkaでは、DBMS_KAFKA.CREATE_[LOAD|STREAMING|SEEKABLE]_APP()
プロシージャのoptions引数にデータ形式およびスキーマが指定されます。
ノート:
形式タイプに関係なく、作成される表およびビューには、 KAFKA_PARTITION
、KAFKA_OFFSET
およびKAFKA_EPOCH_TIMESTAMP
の3つの追加列が含まれます。
- JSON形式およびOracle SQL Access to Kafka
JSONの場合、Oracle SQL Access to Kafkaにより、表またはビューの列が決定されます。 - 区切りテキスト形式およびOracle SQL Access to Kafka
区切りテキスト形式の場合、Oracle SQL Access to Kafkaにより、Kafkaデータを持つユーザー・スキーマにビューおよび一時表が作成されます。 - Avro形式とOracle SQL Access to Kafka
Avro形式の場合、Oracle SQL Access to Kafkaにより、Avroスキーマを使用してデータ列と3つのメタデータ列が決定されます。
親トピック: Oracle SQL Access to Kafka
22.8.1 JSON形式およびOracle SQL Access to Kafka
JSONの場合、Oracle SQL Access to Kafkaにより、表またはビューの列が決定されます。
次に、オプションを使用してJSONストリーミング・アプリケーションのデータを表示する例を示します:
DECLARE
v_options VARCHAR2;
BEGIN
v_options := ‘{"fmt" : "JSON"}';
SYS.DBMS_KAFKA.CREATE_STREAMING_APP (
'ALPHA1',
'MYAPP',
'ExampleTopic',
v_options);
END;
/
Javascript Object Notation (JSON)データでは、Oracle SQL Access to Kafkaにより、Kafkaデータを介してユーザー・スキーマにビューおよびグローバル一時表が作成されます。これらのビューには、接頭辞ORA$DKV_
が付きます。一時表には、接頭辞ORA$DKVGTT_
が付きます。パッケージDBMS_KAFKA.CREATE_xxx_APP
は、固定スキーマを使用して、KafkaレコードからJSONデータを返します。
たとえば:
SQL> describe ORA$DKVGTT_ALPHA1_MYAPP_0;
Name Null? Type
----------------------------------------- -------- ----------------------------
KAFKA_PARTITION NUMBER(38)
KAFKA_OFFSET NUMBER(38)
KAFKA_EPOCH_TIMESTAMP NUMBER(38)
VALUE VARCHAR2(4000)
VARCHAR2
型の場合、VALUE
列の長さは、データベースの最大varchar2
長によって制限されます。VALUE
列には、タイプCLOB
のオプションがあります。
KAFKA_
列は、KafkaレコードのパーティションID、オフセットおよびタイムスタンプを識別します。(基礎となるタイムスタンプ表現は、Unixエポック以降のミリ秒数を表す整数です。)
Kafkaレコードの値部分のデータは、VALUE列にテキストとして返されます。外部テキストの文字エンコーディングは、AL32UTF8
として固定されています。Oracle SQL Access to Kafkaロジックでは、VALUE
列の有効なJSON構文はチェックされません。ただし、SQL問合せのJSON演算子がVALUE
データを解析しようとすると、障害のあるJSONが検出されます。
22.8.2 区切りテキスト形式およびOracle SQL Access to Kafka
区切りテキスト形式の場合、Oracle SQL Access to Kafkaにより、Kafkaデータを持つユーザー・スキーマにビューおよび一時表が作成されます。
CSVやカンマ区切りデータなどの区切りデータの場合、Oracle SQL Access to Kafkaにより、Kafkaデータを介してユーザー・スキーマにビューおよびグローバル一時表が作成されます。これらのビューには、接頭辞ORA$DKV_
が付きます。一時表には、接頭辞ORA$DKVGTT_
が付きます。DSV形式の場合、データ列は、オプションで渡された参照表と3つのメタデータ列に基づきます
Oracle SQL Access to Kafkaのデリミタ付きテキスト形式を使用して作成された一時表およびビューには、Kafkaレコードの値フィールド内のデリミタ付きテキスト・データの形状を反映する列があります。Oracle SQL Access to Kafkaでは、テキスト・データは表およびビュー定義で表現されるネイティブOracleデータ型に変換されます。外部テキストの文字エンコーディングは、AL32UTF8
として固定されています。
Kafkaレコードが取得されると、正規レイアウトが作成されます。これは、Kafkaパーティション識別子(INTEGER
)、Kafkaレコード・オフセット(INTEGER
)およびKafkaレコード・タイムスタンプ(INTEGER
)で始まり、Kafka値内のデリミタ付きテキスト・データが続きます。つまり、Kafkaデータはフラット化され、ビュー・スキーマ定義の順序を使用して純粋なデリミタ付きテキスト・フィールドの行としてストリーミングされます。
次のOracleデータ型がサポートされています:
INTEGER
、INT
、NUMBER
CHAR
、VARCHAR2
NCHAR
、NVARCHAR2
CLOB
、NCLOB
、BLOB
FLOAT
、BINARY_FLOAT
、BINARY_DOUBLE
TIMESTAMP
、DATE
TIMESTAMP WITH TIME ZONE
、TIMESTAMP WITH LOCAL TIME ZONE
INTERVAL
RAW
BOOLEAN
アプリケーション作成時のデリミタ付きテキストの仕様を簡略化するには、Kafkaレコード値フィールドでユーザー・データの列が物理的に順序付けされた順序でこれらの列を記述する表の名前を指定します。Oracle SQL Access to Kafkaでは、その名前がビューおよび一時表で使用されます。
次の例は、Oracle SQL Access to Kafkaアプリケーションの作成時に指定されたデリミタ付きテキスト・データ表(参照表またはreftable)の形状を示しています。ここでも、Kafka値フィールドには、物理的な順序およびデリミタ付きテキストからの必要なデータ型変換が反映されています。
reftableは、形状を反映するOracle SQL Access to Kafkaのビューおよび一時表を作成するためにCREATE_xxx_APP
コールに使用された後、保持する必要があります。ビューを再作成するには、reftableが必要です。
SQL> describe FIVDTI_SHAPE;
Name Null? Type
----------------------------------------- -------- ----------------------------
F1 NUMBER
I2 NUMBER
V3 VARCHAR2(50)
D4 DATE
T5 TIMESTAMP(6)
V6 VARCHAR2(200)
I7 NUMBER
参照表には、Kafkaレコード値のフィールドのみが記述されています。たとえば、reftable FIVDTI_SHAPE
は、F1、I2、V3、D4、T5、V6、I7がKafkaレコード値のフィールドであるKafkaレコードをサポートできる場合があります。Kafkaレコード値のフィールドは、デリミタ(カンマ・デリミタなど)で区切る必要があります。
ノート:
参照表に非表示列を含めることはできません。列の順序は、Kafkaレコードのデータ値の順序と一致する必要があります。非表示列のCOLUMN_ID
はNULL
であるため、列リスト内での位置を特定できません。
FIVDTI_SHAPE
表に記述されたデータ用として作成されたOracle SQL Access to Kafka一時表には、次のスキーマが含まれます:
SQL> describe ORA$DKVGTT_ALPHA1_MYAPP__0;
Name Null? Type
----------------------------------------- -------- ----------------------------
KAFKA_PARTITION NUMBER(38)
KAFKA_OFFSET NUMBER(38)
KAFKA_EPOCH_TIMESTAMP NUMBER(38)
F1 NUMBER
I2 NUMBER
V3 VARCHAR2(50)
D4 DATE
T5 TIMESTAMP(6)
V6 VARCHAR2(200)
I7 NUMBER
22.8.3 Avro形式およびOracle SQL Access to Kafka
Avro形式の場合、Avroスキーマを使用してデータ列と3つのメタデータ列が決定されます。
- Oracle SQL Access to KafkaでのAvro形式の使用について
Oracle SQL Access to KafkaがAvro形式のKafkaデータをOracle Database表およびビューで使用できるようにする方法について説明します。 - Oracle SQL Access to KafkaでサポートされているAvroプリミティブ型
Apache Avroスキーマのプリミティブ型名をデータベースで使用するために、Oracleでは、これらの型をSQLデータ型に変換します。 - Oracle SQL Access to KafkaでサポートされているAvro複合型
Apache Avroスキーマの複合型名をデータベースで使用するために、Oracleでは、これらの型をサポートされているSQLデータ型に変換します。 - Oracle SQL Access to KafkaでサポートされているAvro論理的型
Apache Avroスキーマの論理的型名をデータベースで使用するために、Oracleでは、これらの型をサポートされているSQLデータ型に変換します。
22.8.3.1 Oracle SQL Access to KafkaでのAvro形式の使用について
Oracle SQL Access to KafkaがAvro形式のKafkaデータがOracle Databaseの表およびビューで使用できるようにする方法について説明します。
Oracle Databaseの表およびビューでアプリケーション別のApache Avro形式のデータを使用できるように、Oracle SQL Access for Kafkaでは、options引数で指定されたスキーマに基づくデータ形式をDBMS_KAFKA.CREATE_[LOAD|STREAMING|SEEKABLE]_APP()
プロシージャに変換します。
Apache Avroレコードは、名前付きフィールドおよび型の順序付きリストです。レコードのスキーマは、データの構造およびその読取り方法を定義します。Avroスキーマは、Oracle SQL Access to Kafkaアプリケーションの作成時に渡す必要があります。つまり、Oracle SQL Access to Kafkaアプリケーションでは、Kafkaトピックに対して1つのAvroスキーマのみをサポートできます。トピック・ストリームで複数のスキーマ型を使用することはサポートされていません。スキーマが進化した場合、新しいOracle SQL Access to Kafkaアプリケーションを作成する必要があります。Oracle SQL Access to Kafkaは、Confluent Schema Registryをサポートしていません。Avro形式のKafkaレコードにConfluentヘッダーが含まれている場合、そのヘッダーは削除され、Oracle SQL Access to Kafkaによって無視されます。
Kafkaはスキーマを持たず、形式に依存しません。アプリケーション・データは、Apache Avroレコードのキー・フィールドおよび値フィールドに不透明なバイト配列として格納されます。Kafkaキーは主にKafkaパーティションへのデータのハッシュに使用されるため、Apache Avroレコードの値フィールドのみが取得され、Oracle Database表でOracle行としてレンダリングされます。アプリケーションは、データのシリアライズとデシリアライズ、およびデータ形式の構造を定義するスキーマの提供を担当します。
Oracle SQL Access to KafkaではAvroプリミティブ型と複合型の両方を使用できますが、アプリケーションごとに1つのタイプのみを使用できます。
22.8.3.2 Oracle SQL Access to KafkaでサポートされるAvroのプリミティブ型
Apache Avroスキーマのプリミティブ型名をデータベースで使用するために、Oracleでは、これらの型をSQLデータ型に変換します。
表22-1 Oracle SQL Access to KafkaのAvroプリミティブ型およびOracle型の変換
型の説明 | Avroプリミティブ型 | Oracle型 |
---|---|---|
null/値なし |
|
|
(該当なし) |
|
|
32ビット符号付き整数 |
|
|
64ビット符号付き整数 |
|
|
IEEE 32ビット浮動小数点 |
|
|
IEEE 64ビット浮動小数点 |
|
|
バイト配列/バイナリ |
|
|
UTF-8でエンコードされた文字列 |
|
|
次のAvroスキーマの例では、すべてのAvroプリミティブ型を使用するレコードを定義します:
{
"type" : "record",
"name" : "primitives",
"fields" : [
{ "name" : "f_null", "type" : "null" },
{ "name" : "f_boolean", "type" : "boolean" },
{ "name" : "f_int", "type" : "int"}
{ "name" : "f_long", "type" : "long" },
{ "name" : "f_float", "type" : "float" },
{ "name" : "f_double", "type" : "double" },
{ "name" : "f_bytes", "type" : "bytes" },
{ "name" : "f_string", "type" : "string"}
]
}
この例のAvroスキーマを使用してAvroデータ用のOracle SQL Access to Kafka一時表を作成した場合、一時表には次のスキーマが含まれます:
describe ORA$DKVGTT_ALPHA1_MYAPP__0;
Name Null? Type
----------------------------------------- -------- ----------------------------
KAFKA_PARTITION NUMBER(38)
KAFKA_OFFSET NUMBER(38)
KAFKA_EPOCH_TIMESTAMP NUMBER(38)
F_NULL CHAR(1)
F_BOOLEAN NUMBER(1)
F_INT NUMBER(38)
F_LONG NUMBER(38)
F_FLOAT BINARY_FLOAT
F_DOUBLE BINARY_DOUBLE
F_BYTES BLOB
F_STRING VARCHAR2(4000)
VARCHAR2
型の長さ(この例では、F_STRING
列用)は、データベースの最大varchar2
長によって決まります。
22.8.3.3 Oracle SQL Access to KafkaでサポートされるAvroの複雑な型
Apache Avroスキーマの複合型名をデータベースで使用するために、Oracleでは、これらの型をサポートされているSQLデータ型に変換します。
説明
Apache Avroの複合データ型は、指定された属性を取得します。Avro複合型を使用するために、次の表に示すように、これらはOracle型に変換されます。
表22-2 Oracle SQL Access to KafkaのAvro複合型およびOracle型の変換
Avro複合型 | Oracle型 | 型の説明 |
---|---|---|
|
|
fixed型は、バイナリ・データの格納に使用できる固定長フィールドの宣言に使用されます。フィールドの名前と1バイトのサイズの2つの必須属性があります。 |
|
|
Avro enumフィールド。 Avro enumは列挙型です。これは、型名が |
|
|
構造フィールド。 構造フィールドは、入力Avroレコードのフィールドに対応します。レコードは、すべてが結合されて1つのことを記述する属性のカプセル化を表します。 |
|
|
mapは、データをキーと値のペアとして編成する連想配列(またはディクショナリ)です。Avro mapのキーは文字列である必要があります。Avro mapでは、1つの属性(値)のみがサポートされます。この属性は必須であり、mapの値部分のタイプを定義します。 値には任意の型を使用できます。 |
|
|
任意の型のarray arrayタイプはarrayフィールドを定義します。これは必須のitems属性のみをサポートします。items属性は、array内のitemsの型を識別します。 |
ノート:
Avro複合型record
、map
およびarray
は、VARCHAR2
型への変換の前にJSON形式の文字列に変換されます。
次のAvroスキーマの例では、すべてのAvro複合型を使用するレコードを定義します:
{
"type" : "record",
"name" : "complex",
"fields" : [
{ "name" : "f_fixed",
"type" : { "type" : "fixed", "name" : "ten", "size" : 10}
},
{ "name" : "f_enum",
"type" : { "type" : "enum", "name" : "colors",
"symbols" : [ "red", "green", "blue" ] }
},
{ "name" : "f_record",
"type" : {"type" : "record", "name" : "person",
"fields" : [ { "name" : "first_name", "type" : "string" },
{ "name" : "last_name", "type" : "string"} ] }
},
{ "name" : "f_map",
"type" : { "type" : "map", "values" : "int" }
},
{ "name" : "f_array",
"type" : {"type" : "array", "items" : "string" }
}]
}
この例のAvroスキーマを使用してAvroデータ用のOracle SQL Access to Kafka一時表を作成した場合、一時表には次のスキーマが含まれます:
describe ORA$DKVGTT_ALPHA1_MYAPP__0;
Name Null? Type
----------------------------------------- -------- ----------------------------
KAFKA_PARTITION NUMBER(38)
KAFKA_OFFSET NUMBER(38)
KAFKA_EPOCH_TIMESTAMP NUMBER(38)
F_FIXED BLOB
F_ENUM VARCHAR2(4000)
F_RECORD VARCHAR2(4000)
F_MAP VARCHAR2(4000)
F_ARRAY VARCHAR2(4000)
VARCHAR2
型の長さ(この例では、F_ENUM
、F_RECORD
、F_MAP
およびF_ARRAY
列)は、データベースの最大varchar2
長によって決定されます。
22.8.3.4 Oracle SQL Access to KafkaでサポートされるAvroの論理型
Apache Avroスキーマの論理型名をデータベースで使用するために、Oracleでは、これらの型をサポートされているSQLデータ型に変換します。
説明
Avro論理型は、Avroプリミティブ型または複合型で、導出型を表す追加の属性があります。論理型は、次の表に示すようにOracle型に変換されます。
表22-3 Oracle SQL Access to KafkaのAvro複合型およびOracle型の変換
型の説明 | Avro論理型 | Oracle型 |
---|---|---|
10進数: スケールなし ×10-scale形式の任意の精度の符号付き10進数 |
decimal (bytes, fixed) |
|
UUID (汎用一意識別子)、GUIDS (グローバル一意識別子)とも呼ばれます: これらのIDは、RFC-4122に準拠してランダムに生成されます。 |
UUID (string) |
サポートされていません。 |
日付 特定のタイム・ゾーンや時間を参照しないカレンダ内の日付 Unixエポック(1970年1月1日)からの日数 |
date (int) |
|
時間(ミリ秒): 特定のカレンダ、タイム・ゾーンまたは日付を参照せずに、午前0時以降のミリ秒数として表される時刻: 00:00:00.000 |
time-millis (int) |
|
時間(マイクロ): 特定のカレンダ、タイム・ゾーンまたは日付を参照せずに、午前0時以降のマイクロ秒数として表される時刻: 00:00:00.000000 |
time-micros (long) |
|
タイムスタンプ(ミリ) UTC: 特定のタイム・ゾーンまたはカレンダに関係なく、Unixエポック(1970年1月1日)からのミリ秒数として表されるグローバル・タイムライン上の時点: 00:00:00.000 UTC |
timestampmillis (long) |
|
タイムスタンプ(マイクロ) UTC: 特定のタイム・ゾーンまたはカレンダに関係なく、Unixエポック(1970年1月1日)からのマイクロ秒数として表されるグローバル・タイムライン上の時点: 00:00:00.000000 UTC |
timestampmicros (long) |
|
時間 月数、日数およびミリ秒数で定義された時間。 |
fixed (size:12) |
サポートされていません。 |
ノート:
論理型time-millis、time-macros、timestampmillisおよびtimestampmicrosで使用される10進型は、バイト配列として内部的に格納されます(固定または非固定)。Avroライターによっては、これらの配列の中には小数点の文字列表現を格納するものもあれば、スケーリングされていない値を格納するものもあります。あいまいなデータを表示しないように、オプションavrodecimaltype
を使用して、使用する表現を明示的に宣言することをお薦めします。このオプションが明示的に指定されていない場合、Oracle SQL Access to Kafkaのデフォルト・オプションでは、スケーリングされていないデータの表現がファイルの10進列に格納されます。
次のAvroスキーマの例では、すべてのAvro論理型を使用するレコードを定義します:
{
"type" : "record",
"name" : "logical",
"fields" : [ {
"name" : "f_decimal",
"type" : {
"type" : "bytes",
"logicalType" : "decimal",
"precision" : 4,
"scale" : 2
}
}, {
"name" : "f_date",
"type" : {
"type" : "int",
"logicalType" : "date"
}
}, {
"name" : "f_time_millis",
"type" : {
"type" : "int",
"logicalType" : "time-millis"
}
}, {
"name" : "f_time_micros",
"type" : {
"type" : "long",
"logicalType" : "time-micros"
}
}, {
"name" : "f_timestamp_millis",
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis"
}
}, {
"name" : "f_timestamp_micros",
"type" : {
"type" : "long",
"logicalType" : "timestamp-micros"
}
} ]
}
この例のAvroスキーマを使用してAvroデータ用のOracle SQL Access to Kafka一時表を作成した場合、一時表には次のスキーマが含まれます:
describe ORA$DKVGTT_ALPHA1_MYAPP__0;
Name Null? Type
----------------------------------------- -------- ----------------------------
KAFKA_PARTITION NUMBER(38)
KAFKA_OFFSET NUMBER(38)
KAFKA_EPOCH_TIMESTAMP NUMBER(38)
F_DECIMAL NUMBER
F_DATE DATE
F_TIME_MILLIS TIMESTAMP(3)
F_TIME_MICROS TIMESTAMP(6)
F_TIMESTAMP_MILLIS TIMESTAMP(3)
F_TIMESTAMP_MICROS TIMESTAMP(6)
22.9 Kafkaクラスタへのアクセスの構成
保護されているKafkaクラスタまたは保護されていないKafkaクラスタへのアクセスを構成できます
- クラスタ・アクセス・ディレクトリの作成
Oracle SQL Access to Kafka管理者は、クラスタへのデータベース・ユーザー・アクセスを制御するために、Kafkaクラスタごとにクラスタ・アクセス・ディレクトリ・オブジェクトを作成する必要があります。 - Kafka構成ファイル(osakafka.properties)
Kafkaクラスタにアクセスするには、Kafkaクラスタへのアクセスに必要な情報を含む構成ファイルを作成する必要があります。 - Kafka構成ファイルのプロパティ
ここで説明するプロパティは、Kafka構成ファイルosakafka.properties
で使用されます。 - クラスタ・アクセス・ディレクトリに必要なセキュリティ構成ファイル
セキュリティ・プロトコルに基づいて、必要な構成ファイルを識別します。
親トピック: Oracle SQL Access to Kafka
22.9.1 クラスタ・アクセス・ディレクトリの作成
Oracle SQL Access to Kafka管理者は、クラスタへのデータベース・ユーザー・アクセスを制御するために、Kafkaクラスタごとにクラスタ・アクセス・ディレクトリ・オブジェクトを作成する必要があります。
クラスタ・アクセス・ディレクトリは、Kafkaクラスタ構成ファイルを含むOracleディレクトリ・オブジェクトです。このディレクトリはすべてのクラスタに必要です。Kafkaクラスタにアクセスするには、Kafkaクラスタごとに独自のクラスタ・アクセス・ディレクトリが必要です。Oracle SQL Access to Kafka管理者として、クラスタ・アクセス・ディレクトリ・オブジェクトを作成し、Kafkaクラスタにアクセスする必要があるデータベース・ユーザーにこのディレクトリへのREAD
アクセス権を付与することで、Kafkaクラスタへのアクセスを管理します。DBMS_KAFKA_ADM.REGISTER_CLUSTER()
プロシージャをコールする前に、クラスタ・アクセス・ディレクトリを作成する必要があります。
例22-3 クラスタ・アクセス・ディレクトリ・オブジェクトの作成とREADアクセス権の付与
まず、クラスタ・アクセス・ディレクトリ・オブジェクトを作成します。この例では、オブジェクトはosak_kafkaclus1_access
です:
CREATE DIRECTORY osak_kafkaclus1_access AS '';;
Kafkaクラスタが正常に登録されたら、Oracle SQL Access to Kafka管理者は、このディレクトリに対するREADアクセス権をユーザーに付与します。
この例では、ユーザーexample_user
にosak_kafkaclus1_access
へのアクセス権が付与されています:
GRANT READ ON DIRECTORY osak_kafkaclus1_access TO example_user;
親トピック: Kafkaクラスタへのアクセスの構成
22.9.2 Kafka構成ファイル(osakafka.properties)
Kafkaクラスタにアクセスするには、Kafkaクラスタへのアクセスに必要な情報を含む構成ファイルを作成する必要があります。
- Kafka構成ファイルについて
osakafka.properties
ファイルには、保護されているKafkaクラスタへのアクセスに必要な構成情報と、Oracle SQL Access to Kafkaに関する追加情報が含まれています。 - Oracle SQL Access for Kafka構成ファイルのプロパティ
osakafka.properties
ファイルを作成するには、ここで説明するようにプロパティを確認して指定します。 - Kafkaアクセス・ディレクトリの作成
セキュアなKafkaクラスタにアクセスするには、KafkaクラスタごとにKafkaアクセス・ディレクトリを作成する必要があります。
親トピック: Kafkaクラスタへのアクセスの構成
22.9.2.1 Kafka構成ファイルについて
osakafka.properties
ファイルには、保護されているKafkaクラスタへのアクセスに必要な構成情報と、Oracle SQL Access to Kafkaに関する追加情報が含まれています。
Kafka構成ファイルosakafka.properties
は、クラスタ・アクセス・ディレクトリに作成されます。osakafka.properties
ファイルは、(OSAK_ADMIN_ROLE
が付与された) Oracle SQL Access to Kafka管理者が作成します。このファイルは、Apache Kafkaクラスタに接続するためにDBMS_KAFKA_ADM
パッケージによって使用されます。
Oracle SQL Access to Kafka管理者は、各Kafkaクラスタの構成ファイルを格納するクラスタ・アクセス・ディレクトリを作成します。各クラスタ・アクセス・ディレクトリには、独自のKafka構成ファイルがあります。Apache Kafkaクラスタへのアクセスを管理するために、1人のOracle SQL Access to Kafka管理者のみが、Kafkaクラスタのクラスタ・アクセス・ディレクトリに対する読取りおよび書込みアクセス権を持ちます。他のユーザーには、クラスタ・アクセス・ディレクトリまたはKafka構成ファイルに対する権限は付与されません。
Kafka構成ファイルの機能
osakafka.properties
ファイルは、librdkafka
を使用するKafkaコンシューマで使用されるコンシューマ・プロパティ・ファイルに似ています。セキュアなApache Kafkaクラスタには、認証局、クライアント秘密キーおよびクライアント公開証明書(PEM)などの資格証明ファイルが必要です。これらの追加ファイルも、librdkafka
を使用するKafkaコンシューマに必要なものと似ています。osakafka.properties
ファイルには、次のプロパティがあります:
- Kafkaクラスタへのアクセスに必要な設定および構成の一部として、Oracle SQL Access to Kafka管理者によって作成および管理されます。
- キーと値のペアのテキスト・ファイルで構成されます。各行の形式は、キーおよび値を記述する
キー=値
で、改行で終了します。改行文字をキーまたは値の一部にすることはできません。 osak
接頭辞で識別されるOracle SQL Access to Kafkaパラメータが含まれます。- Oracle SQL Access to Kafkaのデバッグ・プロパティが含まれます。
librdkafka
APIを使用してKafkaクラスタに接続するためにDBMS_KAFKA_ADM
パッケージによって使用されます。- セキュアなKafkaクラスタが、
librdkaka
インタフェースを使用してKafkaクラスタに接続するために必要なセキュリティ構成プロパティ、osak
接頭辞で識別されるOracle SQL Access to Kafkaチューニング・プロパティ、およびデバッグ・プロパティを格納するために必要です。セキュアなクラスタ・アクセスの場合、キーと値のペアには、SSL/TLS証明書、クライアントの公開キーおよび秘密キーなどのクラスタ構成ファイルが含まれます。 - セキュアでないKafkaクラスタでは、クラスタ接続用のチューニング・プロパティとデバッグ・プロパティを含めることはオプションです
osakafka.properties
ファイルは、Oracle SQL Access to Kafkaクラスタ・アクセス・ディレクトリのパスORACLE_base/osak/clusters/cluster-name/config
に格納されます。ここで、Oracle_base
は、ターゲットOracle DatabaseのOracleベース・ディレクトリで、cluster-name
は、アクセス情報が構成ファイルに格納されるKafkaクラスタの名前です。
Kafka構成ファイルの作成に関するガイドライン
Apache Kafkaクラスタ、Oracle SQL Access to Kafka管理者へのアクセスに必要な設定および構成の一部として、このファイルの情報を使用して、Cインタフェースでセッション・コンテキストを設定します。これにより、librdkafka
APIを使用してKafkaクラスタに接続されます。
SYS.DBMS_KAFKA_SEC_ALLOWED_PROPERTIES
システム表には、サポートされているコンシューマ構成プロパティ(セキュリティ・プロパティを含む)の事前移入済リストが含まれています。拡張性のために、SYS
はこの表に特定の制限付きでさらにプロパティを追加できます
DBMS_KAFKA_ADM.REGISTER_CLUSTER()
プロシージャは、SYS.DBMS_KAFKA_SEC_ALLOWED_PROPERTIES
システム表にもリストされているosakafka.properties
ファイルからこれらのプロパティのみを読み取ります。その他のプロパティは無視されます。
22.9.2.2 Oracle SQL Access for Kafka構成ファイルのプロパティ
osakafka.properties
ファイルを作成するには、ここで説明するようにプロパティを確認して指定します。
osakafka.propertiesファイル処理
osakafka.properties
に指定するプロパティは、次の表に示すプロパティである必要があります。その他のキーと値のペアを指定すると、これらの値は無視されます。
次の点に注意してください。
osak
接頭辞があるプロパティ名は、内部チューニング・プロパティまたはデバッグ・プロパティです。osak
接頭辞がないプロパティ名は、librdkafka
で使用されるKafkaコンシューマ・プロパティです。プロパティの完全なリストについては、Apache Kafka C/C++クライアント・ライブラリ(librdkafka
)のドキュメントを参照してください。
プロパティ | 許可される値 | 説明 |
---|---|---|
security.protocol |
PLAINTEXT SSL SASL_PLAIN_TEXT SASL_SSL |
Kafkaブローカとの通信に使用されるセキュリティ・プロトコル |
sasl.mechanisms |
GSSAPI PLAIN SCRAM-SHA-256 SCRAM-SHA-512 |
認証に使用するSASLメカニズム ノート: 複数形の名前にしても、構成する必要があるメカニズムは1つのみです。 このプロパティは、古いKafkaクラスタに下位互換性を提供できます。可能な場合は、プロパティ |
sasl.mechanism |
GSSAPI PLAIN SCRAM-SHA-256 SCRAM-SHA-512
|
認証に使用するSimple Authentication and Security Layer (SASL)メカニズム |
|
クラスタ構成ディレクトリ内のファイル | ブローカ・キーを検証するための認証局(CA)証明書のファイル名。絶対パスが指定されている場合、パスの最後のトークンがファイル名とみなされます。 |
ssl.key.location |
クラスタ構成ディレクトリ内のファイル |
クライアント秘密キーのファイル名 絶対パスが指定されている場合、パスの最後のトークンがファイル名とみなされます。 対応するパスワード値は、 |
ssl.certificate.location |
クラスタ構成ディレクトリ内のファイル |
クライアント公開(PEM)キーのファイル名 絶対パスが指定されている場合、パスの最後のトークンがファイル名とみなされます。 |
ssl.endpoint.identification.algorithm |
有効な値: https none |
Kafkaブローカ証明書を使用してKafkaブローカ・ホスト名を検証するためのエンドポイント識別アルゴリズム。値は次のとおりです。
デフォルト値: |
sasl.username |
ユーザー名 |
Kafkaクラスタへの認証に必要なユーザー名。 このユーザー名に対応するパスワード値は、 |
sasl.kerberos.principal |
クライアントKafka Kerberosプリンシパル名 |
クライアントKerberosプリンシパル名 |
sasl.kerberos.ccname |
Kerberosチケット・キャッシュ・ファイル名
|
Kerberosチケット・キャッシュ・ファイル 例: このファイルは、クラスタ構成ディレクトリに存在する必要があります。 |
sasl.kerberos.config |
Kerberos構成ファイル名 |
KafkaクラスタのKerberos構成。例: このファイルは、クラスタ構成ディレクトリに存在する必要があります |
sasl.kerberos.service.name |
Kerberosプリンシパル名(Kafkaプライマリ名) |
Kerberosプリンシパルのプライマリ名。スラッシュ(/ )の前に表示される名前です。たとえば、kafkaはKerberosプリンシパルkafka/broker1.example.com@EXAMPLE のプライマリ名です。
|
max.partition.fetch.bytes |
1024 * 1024 |
|
debug |
すべて | 接続の問題をデバッグするために使用されます。 |
例
次に、セキュリティ・プロトコルSSL
を指定し、クライアント上で認証局(CA)証明書を使用して認証を提供するosakafka.properties
ファイルの例を示します:
security.protocol=ssl
ssl.ca.location=ca-cert
ssl.certificate.location=client_myhostname_client.pem
ssl.key.location=client_myhostname_client.key
22.9.2.3 Kafkaアクセス・ディレクトリの作成
セキュアなKafkaクラスタにアクセスするには、KafkaクラスタごとにKafkaアクセス・ディレクトリを作成する必要があります。
Oracle SQL Access to Kafka管理者は、オペレーティング・システム・ディレクトリOracle-base/osak/
を作成します。この場合、cluster_name
/configOracle-base
はOracleベース・ディレクトリ、cluster_name
はSYS.DBMS_KAFKA_ADM.REGISTER_CLUSTER
コールに渡されるクラスタ名パラメータの値です。各Kafkaクラスタには、独自の専用Kafkaクラスタ・ディレクトリが必要です。
このディレクトリには、Kafkaクラスタへのアクセスに必要なすべての構成ファイルが含まれている必要があります:
osakafka.properties
ファイル。osakafka.properties
ファイルにリストされているセキュリティ・ファイル
次の例では、Oracleベース・ディレクトリは/u01/app/oracle
で、クラスタ名はkafkaclus1
です:
mkdir u01/app/oracle/osak/kafkaclus1/config;
CREATE DIRECTORY osak_kafkaclus1_config AS
‘u01/app/oracle/osak/kafkaclus1/config’ ;
22.9.3 Kafka構成ファイルのプロパティ
ここで説明するプロパティは、Kafka構成ファイルosakafka.properties
で使用されます。
説明
Kafka構成ファイルのプロパティには、Apache Kafkaクラスタの構成情報が含まれています。Kafka構成ファイルには、2つのカテゴリのプロパティ名があります:
- コンシューマ構成プロパティ・パラメータは、Apache Kafkaブローカで使用されるプロパティです。これらのファイル
- Oracleプロパティは、
osak
接頭辞が付いたプロパティ名です。これらのプロパティは、内部チューニングまたはデバッグに使用されます。
Kafka構成ファイルにリストされているプロパティは、サポートされているすべてのプロパティを含むシステム表SYS.DBMS_KAFKA_SEC_ALLOWED_PROPERTIES
とクロスチェックされます。osakafka.properties
ファイルに指定されているが、SYS.DBMS_KAFKA_SEC_ALLOWED_PROPERTIES
表にリストされていないプロパティは、OSAKによって無視されます。osakafka.properties
ファイルで許可されるプロパティおよび値を次に示します:
表22-4 Kafka構成ファイルのプロパティ名および説明
プロパティ名 | 許可される値 | 説明 |
---|---|---|
|
|
Kafkaブローカとの通信に使用されるセキュリティ・プロトコル |
|
|
認証に使用するSASLメカニズム ノート: 複数形の名前にしても、構成する必要があるメカニズムは1つのみです。 このプロパティは、古いKafkaクラスタに下位互換性を提供できます。可能な場合は、プロパティ |
|
|
認証に使用するSASLメカニズム |
|
クラスタ構成ディレクトリ内のファイル |
ブローカ・キーを検証するための認証局(CA)証明書のファイル名。絶対パスが指定されている場合、パスの最後のトークンがファイル名とみなされます |
|
クラスタ構成ディレクトリ内のファイル |
クライアント秘密キーのファイル名 絶対パスが指定されている場合、パスの最後のトークンがファイル名とみなされます 対応するパスワード値は、 |
|
クラスタ構成ディレクトリ内のファイル |
クライアント公開(PEM)キーのファイル名 絶対パスが指定されている場合、パスの最後のトークンがファイル名とみなされます。 |
|
有効な値: デフォルト値: |
Kafkaブローカ証明書を使用してKafkaブローカ・ホスト名を検証するためのエンドポイント識別アルゴリズム。
|
|
Kafkaクラスタでの認証に必要なユーザー名 |
Kafkaクラスタでの認証に必要なユーザー名。 対応するパスワード値は、 |
|
クライアントKafka Kerberosプリンシパル名 |
クライアントKerberosプリンシパル名 |
|
Kerberosチケット・キャッシュ・ファイル名 |
Kerberosチケット・キャッシュ・ファイル 例: このファイルは、クラスタ構成ディレクトリに存在する必要があります。 |
|
Kerberos構成ファイル名 |
KafkaクラスタのKerberos構成。 例: このファイルは、クラスタ構成ディレクトリに存在する必要があります |
|
Kafkaで実行するKerberosプリンシパル名。 |
Kafkaで実行するKerberosプリンシパル名。 |
|
1024 * 1024 |
|
|
すべて |
接続の問題をデバッグするために使用されます |
例22-4 プロパティを含む構成ファイル
osakafka.properties file for security protocol: SSL with client authentication
security.protocol=ssl
ssl.ca.location=ca-cert
ssl.certificate.location=client_myhostname_client.pem
ssl.key.location=client_myhostname_client.key
親トピック: Kafkaクラスタへのアクセスの構成
22.9.4 クラスタ・アクセス・ディレクトリに必要なセキュリティ構成ファイル
セキュリティ・プロトコルに基づいて、必要な構成ファイルを特定します。
セキュアなKafkaクラスタへのアクセスを構成するには、Oracle SQL Access to Kafka管理者は、Kafkaクラスタ・アクセス・ディレクトリから複数の構成ファイルを追加する必要があります。必要なファイルのリストは、Kafkaクラスタでセキュリティを構成するために使用されるセキュリティ・プロトコルによって異なります。ファイル・リストには、認証局ファイル、SSLクライアント公開証明書ファイル(PEM形式)、SSLクライアント非公開キー・ファイルなどのファイルを含めることができます。
ノート:
Kerberosチケット管理はOracle SQL Access to Kafkaの外部で処理されるため、Kerberoskeytab
ファイルは必要ありません。
- SASL_SSL/GSSAPI
クラスタ・アクセス・ディレクトリに必要なファイルであるGSSAPI
認証プロトコルを使用するSASL_SSL
を持つApacheクラスタ - SASL_PLAINTEXT/GSSAPI
クラスタ・アクセス・ディレクトリに必要なファイルであるGSSAPI
認証プロトコルを使用するSASL_PLAINTEXT
を持つApacheクラスタ - SASL_PLAINTEXT/SCRAM-SHA-256
クラスタ・アクセス・ディレクトリに必要なファイルであるSCRAM-SHA-256
認証プロトコルを使用するSASL_PLAINTEXT
を持つApacheクラスタ - SASL_SSL/PLAIN
クラスタ・アクセス・ディレクトリに必要なファイルであるPLAIN
認証プロトコルを使用するSASL_SSL
を持つApacheクラスタ - クライアント認証を使用したSSL
クラスタ・アクセス・ディレクトリに必要なファイルであるSSL
認証プロトコルを持つApacheクラスタ - クライアント認証を使用しないSSL
クラスタ・アクセス・ディレクトリに必要なファイルであるSSL
認証プロトコルを持ちクライアント認証を使用しないApacheクラスタ
親トピック: Kafkaクラスタへのアクセスの構成
22.9.4.1 SASL_SSL/GSSAPI
クラスタ・アクセス・ディレクトリに必要なファイルであるGSSAPI
認証プロトコルを使用するSASL_SSL
を持つApacheクラスタ
説明
SASL_SSL/GSSAPIプロトコルは、暗号化を使用したKerberos認証を指定します。Kerberosチケットは外部で管理する必要があります(Oracle SQL Access to Kafkaの外部)。
DBMS_CREDENTIAL
Kerberosチケットは外部で管理されるため、必須ではありません。
クラスタ・アクセス・ディレクトリの必須ファイル
- 認証局(CA)ファイル
- CAファイルを指定する
SSL.CA.location
を含むosakafka.properties
ファイルは、SSL認証局です。
次の例では、プロパティsecurity.protocol
はSASL_SSL
を指定します。プロパティsasl.mechanism
は、GSSAPI
を指定します。CAファイルはCA-cert.pem
で、プロパティssl.CA.location
で指定されます。
security.protocol=SASL_SSL
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.kerberos.config=krb5.conf
sasl.kerberos.ccname=krb5ccname_osak
sasl.kerberos.principal=kafkaclient/<FQDN-hostname>@<Realm>
ssl.ca.location=ca-cert.pem
ssl.endpoint.identification.algorithm=https
22.9.4.2 SASL_PLAINTEXT/GSSAPI
クラスタ・アクセス・ディレクトリに必要なファイルであるGSSAPI
認証プロトコルを使用するSASL_PLAINTEXT
を持つApacheクラスタ
説明
SASL_PLAINTEXT/GSSAPI
プロトコルは、暗号化を使用せずにKerberos認証を指定します。Kerberosチケットは外部で管理する必要があります(Oracle SQL Access to Kafkaの外部)。
DBMS_CREDENTIAL
Kerberosチケットは外部で管理されるため、必須ではありません。
クラスタ・アクセス・ディレクトリの必須ファイル
- CAファイルを指定する
SSL.CA.location
を含むosakafka.properties
ファイルは、SSL認証局です。
次の例では、プロパティsecurity.protocol
はSASL_PLAINTEXT
を指定し、プロパティsasl.mechanism
はGSSAPI
を指定します。
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.kerberos.principal=kafkaclient/FQDN-hostname@Realm
sasl.kerberos.config=krb5.conf
sasl.kerberos.ccname=krb5ccname_osak
22.9.4.3 SASL_PLAINTEXT/SCRAM-SHA-256
クラスタ・アクセス・ディレクトリに必要なファイルであるSCRAM-SHA-256
認証プロトコルを使用するSASL_PLAINTEXT
を持つApacheクラスタ
説明
SASL_PLAINTEXT/SCRAM-SHA-256
プロトコルは、暗号化を使用せずにSASL SCRAM認証を指定します。
DBMS_CREDENTIAL
SASLユーザー名のパスワードを格納するために必要です。
クラスタ・アクセス・ディレクトリの必須ファイル
osakafka.properties
ファイル。
次の例では、プロパティsecurity.protocol
はSASL_PLAINTEXT
を指定し、プロパティsasl.mechanism
はSCRAM-SHA-256
を指定します。
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.username=testuser
22.9.4.4 SASL_SSL/PLAIN
クラスタ・アクセス・ディレクトリに必要なファイルであるPLAIN
認証プロトコルを使用するSASL_SSL
を持つApacheクラスタ
説明
SASL_SSL/PLAIN
プロトコルは、使用されたOSS Kafkaクラスタの設定を指定します
DBMS_CREDENTIAL
sasl.password
を格納するために必要です。
クラスタ・アクセス・ディレクトリの必須ファイル
osakafka.properties
ファイル。
例22-5 OSSクラスタのosakafka.properties
ファイル
次の例では、プロパティsecurity.protocol
はSASL_SSL
を指定し、プロパティsasl.mechanism
はPLAIN
を指定します。
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.username=<tenancyName>/<username>/<streamPoolID>
#-- limit request size to 1 MB per partition
max.partition.fetch.bytes=1048576
例22-6 非OSSクラスタのosakafka.properties
ファイル
次の例では、プロパティsecurity.protocol
はSASL_SSL
を指定し、プロパティsasl.mechanism
はPLAIN
を指定します。ssl.ca.locationプロパティは、認証局(CA)ファイルを指定します。CAファイルはCA-cert.pem
です。
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.username=kafkauser
ssl.ca.location=ca-cert.pem
ssl.endpoint.identification.algorithm=https
22.9.4.5 クライアント認証を使用したSSL
クラスタ・アクセス・ディレクトリに必要なファイルであるSSL
認証プロトコルを使用するApacheクラスタ
説明
SSL
プロトコルは、クライアント認証を使用してSSLを指定します。
DBMS_CREDENTIAL
SSLキーのパスワードを格納するために必要です。
クラスタ・アクセス・ディレクトリの必須ファイル
osakafka.properties
ファイル。- 構成権限(CA)ファイル
- rdkafkaクライアントのPEMファイル(
rdkafka.client.pem
) - rdkafkaクライアント・キー(
rdkafka.client.key
)
例22-7 SSL osakafka.properties
ファイル
次の例では、プロパティsecurity.protocol
はSSL
を指定し、プロパティssl.key.location
はrdkafkaクライアント・キーを指定し、sa.ca.locationプロパティは認証局ファイルを指定します。
security.protocol=SSL
ssl.certificate.location=rdkafka.client.pem
ssl.key.location=rdkafka.client.key
ssl.ca.location=ca-cert.pem
ssl.endpoint.identification.algorithm=https
22.9.4.6 クライアント認証を使用しないSSL
クラスタ・アクセス・ディレクトリに必要なファイルであるSSL
認証プロトコルを持ちクライアント認証を使用しないApacheクラスタ
説明
SSL
プロトコルは、クライアント認証を使用せずにSSLを指定します。
DBMS_CREDENTIAL
必須ではありません。
クラスタ・アクセス・ディレクトリの必須ファイル
osakafka.properties
ファイル。- 構成権限(CA)ファイル
例22-8 SSL osakafka.properties
ファイル
次の例では、プロパティsecurity.protocol
はSSL
を指定し、sa.ca.location
プロパティは認証局ファイルを指定します。
security.protocol=SSL
ssl.ca.location=ca-cert.pem
ssl.endpoint.identification.algorithm=https
22.10 Oracle SQL Access to Kafkaアプリケーションの作成
Apacheクラスタ・データにアクセスするアプリケーションを作成するには、必要なアプリケーションのタイプを作成します。
Oracle SQL Access to Kafkaには、Apache Kafkaクラスタへのアタッチに使用できる次のアプリケーション・モードがあります:
- ロード中: KafkaトピックからOracle Database表にデータをロードするために使用します。
- ストリーミング: Kafkaトピックを順次読み取るために使用します。
- シーク可能: 指定した開始タイムスタンプから終了タイムスタンプまでの間にKafkaトピックにランダムにアクセスするために使用します。
必要なKafkaトピックへのアクセスの種類に応じて、作成するアプリケーションのタイプを選択します:
DBMS_KAFKA.CREATE_LOAD_APP
は、ロード・モードで使用できるアプリケーションを作成します。DBMS_KAFKA.CREATE_STREAMING_APP
は、ストリーミング・モードで使用できるアプリケーションを作成します。DBMS_KAFKA.CREATE_SEEKABLE_APP
は、シーク可能モードで使用できるアプリケーションを作成します。
例22-9 Kafkaトピックの4つのビューを使用したストリーミング・アプリケーションの作成
次の例では、4つのパーティションを持つKafkaトピックの一時表を含む一連の4つのビューを使用するためのストリーミング・アプリケーションが作成されます。ビューごとに一時表が作成されます。各ビュー(および一時表)は、Kafkaトピックの1つのパーティションに関連付けられています:
DECLARE
v_options VARCHAR2;
BEGIN
v_options := ‘{"fmt" : "DSV", "reftable" : "user_shape_table_name"}';
SYS.DBMS_KAFKA.CREATE_STREAMING_APP (
'ExampleCluster',
'ExampleApp',
'ExampleTopic',
v_options,
4);
END;
/
例22-10 Kafkaトピックの1つのビューを使用したストリーミング・アプリケーションの作成
次の例では、Kafkaトピックの一時表が4つのパーティションを持つ一時表と1つのビューを使用するためのストリーミング・アプリケーションが作成されます。ビュー(一時表)は、Kafkaトピック全体に関連付けられています:
DECLARE
v_options VARCHAR2;
BEGIN
v_options := ‘{"fmt" : "DSV", "reftable" : "user_shape_table_name"}';
SYS.DBMS_KAFKA.CREATE_STREAMING_APP (
'ExampleCluster',
'ExampleApp',
'ExampleTopic',
v_options,
1);
END;
/
親トピック: Oracle SQL Access to Kafka
22.11 Kafkaクラスタ接続のセキュリティ
Oracle SQL Access to Kafkaは、SSL、SASL、Kerberosなどの様々なセキュリティ・メカニズムを使用して、KafkaおよびOracle Streaming Service (OSS)へのアクセスをサポートします。
ノート:
Kafkaクラスタへのアクセスに使用される資格証明には、Kafkaブローカ・メタデータと、Oracle SQL Access to Kafkaアプリケーションの一部となるトピックの両方へのアクセス権が必要です。資格証明に対して有効なアクセス制御リスト(ACL)がある場合は、ブローカとKafkaトピックの両方にアクセス権が付与されていることを確認します。共有Oracle Real Application Clusters (Oracle RAC)環境では、セキュリティ資格証明は、クラスタ・メンバー・ノードに対してローカルではなく、共有の場所に配置する必要があります。セキュアなKafkaクラスタ
Oracle Databaseとクラスタ間でセキュアに暗号化されたデータ転送を維持するために、Oracle SQL Access to Kafkaには、いくつかのセキュリティ・プロトコルが採用されています。セキュアなKafkaクラスタおよびOracle Streaming Services (OSS)クラスタへのアクセスには、セキュリティ構成ファイルが使用されます。これらのオペレーティング・システム・ファイルは、クラスタ構成ディレクトリに存在する必要があります。クラスタ構成ファイルにアクセスするために、クラスタ構成Oracleディレクトリ・オブジェクトが作成されます。このディレクトリへのREAD
アクセス権が付与されるのは、osak_admin_role
のみです。クラスタ構成ファイルは、osak_admin_role
によってのみ読取り可能です。クラスタ構成ファイルには、osakafka.properties
ファイルと、SSL/TLS/PEMファイルや証明書などの追加のセキュリティ・ファイルが含まれます。SSLのキーおよび証明書は、Oracleキーストアに格納されます。
クラスタ・アクセスOracleディレクトリ・オブジェクトは、Kafkaクラスタへのアクセスを制御するために使用されます。このディレクトリ・オブジェクトには構成ファイルが含まれていません。Kafkaセッションは、マルチテナント環境の個々のPDBに限定されます。Kafkaブローカに接続するアプリケーションを作成する各PDBは、独自のアプリケーションを作成する必要があります。
パスワードをファイルに埋め込まないでください。osakafka.properties
ファイルに埋め込まれたパスワード・プロパティは無視されます。すべてのパスワードは、DBMS_CREDENTIAL
パッケージを使用してデータベース資格証明として格納する必要があります。
Kerberos認証を使用したKafkaクラスタ
Kerberos認証を使用するKafkaクラスタの場合、osakafka.properties
ファイルで指定されたKafkaプリンシパルのKerberosチケットをデータベース・システムで取得し、Oracle SQL Access to Kafkaの外部で定期的に更新する必要があります。
クラスタ構成ディレクトリ・オブジェクト、クラスタ・アクセス・ディレクトリ・オブジェクトおよびデータベース資格証明名は、DBMS_KAFKA_ADM.REGISTER_CLUSTER()
コールの入力パラメータとして指定する必要があります。
Oracle SQL Access to Kafka管理者(osak_admin_role
を持つユーザー、OSAK_ADMIN)は、クラスタ登録および管理タスクを実行します。
親トピック: Oracle SQL Access to Kafka
22.12 セキュアでないKafkaクラスタへのアクセスの構成
セキュアでないKafkaクラスタへのアクセスを構成するには、OSAK管理者(osak_admin_roleを持つOracle Databaseユーザー)がこの手順を完了する必要があります。
セキュアでないKafkaクラスタにアクセスするには、Kafkaクラスタへのアクセスを制御するクラスタ・アクセス・データベース・ディレクトリ・オブジェクトを作成する必要があります。このデータベース・ディレクトリの権限付与は、KafkaクラスタにアクセスできるOracle Databaseユーザーの制御に使用されます。このデータベース・ディレクトリには空のパスがあります。対応するオペレーティング・システム・ディレクトリは必要なく、ファイルも含まれていません。クラスタ・アクセス・データベース・ディレクトリ・オブジェクトのOracleディレクトリ・オブジェクト名は、OSAK_CLUSTER_NAME_ACCESS
という形式(CLUSTER_NAME
はKafkaクラスタの名前)にすることをお薦めします。
手順:
-
パスが空のクラスタ・アクセス・データベース・ディレクトリを作成します。このディレクトリは、KafkaクラスタにアクセスできるOracleユーザーを制御するために使用されます。
たとえば、空のパスを持つ
oaskaccess_kafkaclust1
という名前のクラスタ・アクセス・データベース・ディレクトリ・オブジェクトを作成します。このディレクトリは、KafkaクラスタにアクセスできるOracleユーザーを制御するために使用されます。SQL> CREATE DIRECTORY OSAK_KAFKACLUS2_ACCESS AS '';
-
ターゲットのOracle Databaseサーバーで、パス
Oracle_base/osak/cluster_name/config
(Oracle_base
はOracleベース・ディレクトリ、cluster_name
はKafkaクラスタ名)を使用して、Oracleベース・パス・ディレクトリにクラスタ構成オペレーティング・システム・ディレクトリを作成します。たとえば:mkdir /u01/app/oracle/osak/kafkaclus2/config
SYSDBAとしてデータベースにログインし、SQLを起動して、対応するOracleディレクトリ・オブジェクトを作成します。この例では、Kafkaクラスタ名は
KAFKACLUS2
です:SQL> CREATE DIRECTORY OSAK_KAFKACLUS2_CONFIG AS 'u01/app/oracle/osak/kafkaclus2/config';
-
空の
osakafka.properties
ファイルまたはOSAKチューニングまたはデバッグ・プロパティを含むosakafka.propertiesファイルを作成します。 -
SQLで、
DBMS_KAKFA_ADM.REGISTER_CLUSTER()
を使用してKafkaクラスタを登録します。たとえば、サーバー・ホスト名mykafkabootstrap-host
、ポート9092をKafkaクラスタKAFKACLUS2
に使用します:SQL> select DBMS_KAFKA_ADM.REGISTER_CLUSTER ( cluster_name => ‘KAFKACLUS2’, bootstrap_servers =>‘Kafka-example-host:9092’, kafka_provider => DBMS_KAFKA_ADM.KAFKA_PROVIDER_APACHE, cluster_access_dir => ‘OSAK_KAFKACLUS2_ACCESS’, credential_name => NULL, cluster_config_dir => ‘OSAK_KAFKACLUS2_CONFIG’, cluster_description => 'My test cluster kafkaclus2’, options => NULL) from dual;
構成に成功すると、登録の戻り値は0になります:
SQL> DBMS_KAFKA_ADM_RE….. 0
- Kafkaユーザーに読取りアクセス権を付与します。次の例では、ユーザー
app2-usr
に、KAFKACLUS2
という名前のKafkaクラスタへのアクセス権が付与されます:SQL> grant read on directory osak_kafkaclus2_access to
app2-usr
;
親トピック: Oracle SQL Access to Kafka
22.13 セキュアなKafkaクラスタへのアクセスの構成
セキュアなKafkaクラスタへのアクセスを構成するには、この手順を使用します。
セキュアなKafkaクラスタへのアクセスには、osakafka.properties
などの構成ファイルと、SSL/TLS PEMファイルや証明書などの追加のセキュリティ・ファイルが必要です。これらのファイルは、クラスタ構成データベース・ディレクトリ・オブジェクトに格納されます。構成ファイルおよびディレクトリは、オペレーティング・システム・ディレクトリおよびファイル・アクセス権限によって保護されます。
クラスタ構成のオペレーティング・システム・ディレクトリは、Oracleベース・ディレクトリで構成され、Oracle Installatoin所有者またはOracleユーザー(oracle)およびOracle Inventoryグループ(oinstall)が所有しています。OracleユーザーおよびOracle Inventoryグループは、750 (rwxr-x---
)に設定されたディレクトリ権限を持つ必要があり、ディレクトリ内のosakafka.properties
ファイルは、540 (rw-r-----
)に設定された権限を持つ必要があります。クラスタ構成ディレクトリ内の他のすべてのファイルは、440 (r--r-----
)に設定された権限を持つ必要があります。
- Oracle SQL Access for Kafka構成ファイル(
osakafka.properties
)が作成され、クラスタ構成データベース・ディレクトリ・オブジェクトに格納されます。 - 選択したセキュリティ方式のセキュリティ・ファイル(Kerberos、SSL、PEMファイルを使用したTLS/SSL、およびそのために作成された証明書など)は、クラスタ構成データベース・ディレクトリ・オブジェクトに格納されます。
手順:
-
Kafkaクラスタへのアクセスを制御するクラスタ・アクセス・データベース・ディレクトリ・オブジェクトを作成します。このデータベース・ディレクトリ・オブジェクトの権限付与は、KafkaクラスタにアクセスできるOracle Databaseユーザーの制御に使用されます。このデータベース・ディレクトリには空のパスがあります。つまり、対応するオペレーティング・システム・ディレクトリは必要なく、ファイルは含まれません。
たとえば、空のパスを持つ
oaskaccess_kafkaclust1
という名前のクラスタ・アクセス・データベース・ディレクトリ・オブジェクトを作成します。このディレクトリは、KafkaクラスタにアクセスできるOracleユーザーを制御するために使用されます。SQL> CREATE DIRECTORY osakaccess_kafkaclus1 AS '';
-
ターゲットのOracle Databaseサーバーで、パス
Oracle_base/osak/cluster_name/config
(Oracle_base
はOracleベース・ディレクトリ、cluster_name
はKafkaクラスタ名)を使用して、Oracleベース・パス・ディレクトリにクラスタ構成オペレーティング・システム・ディレクトリを作成します。たとえば:mkdir /u01/app/oracle/osak/kafkaclus1/config
-
SYSDBAとしてデータベースにログインし、SQLを起動して、対応するOracleディレクトリ・オブジェクトをターゲットのOracle Databaseに作成します。データベース・オブジェクト名に
OSAK_
を使用することをお薦めします。clusternamneはKafkaクラスタの名前です。たとえば:clustername
_accessCREATE DIRECTORY OSAK_KAFKACLUS1_CONFIG AS '/u01/app/oracle/osak/kafkaclus1/config';
-
使用するセキュリティ・プロトコルに基づいて、クラスタ構成ディレクトリに
osakafka.properties
ファイルを作成します。このファイルは、librdkafka
クライアント・プロパティ・ファイルに似ています。次の例では、クライアント認証を使用してセキュリティ・プロトコルにSecure Socket Layer (SSL)を使用するように
osakafka.properties
ファイルが構成されます。security.protocol=ssl ssl.ca.location=ca-cert ssl.certificate.location=client_myhostname_client.pem ssl.key.location=client_myhostname_client.key ssl.key.password=password-that-is-ignored
-
osakafka.properties
で参照されるセキュリティ・ファイルをクラスタ構成ディレクトリにコピーします。たとえば、ca-certパスが/etc/ssl/certs/
である場合:$cp /etc/ssl/certs/ca-cert /u01/app/oracle/osak/kafkaclus1/config; $cp /etc/ssl/certs/client-myhostname-client.pem /u01/app/oracle/osak/kafkaclus1/config; $cp /etc/ssl/certs/client-myhostname-client.key /u01/app/oracle/osak/kafkaclus1/config;
- 資格証明を設定します:
-
osakafka.properties
ファイルでSSL.key.location
またはsasl.username
プロパティを使用する場合:SSL SASI認証を使用して、Kafkaクラスタでの認証に必要なパスワードを格納するためのデータベース資格証明を作成します。対応するパスワード・プロパティ
ssl.key.password
またはsasl.password
は、クラスタ登録プロセス中にDBMS_KAFKA
によって自動的に追加されます。たとえば:begin dbms_credential.create_credential( credential_name => 'KAFKACLUS1CRED1', username => 'KAFKACLUS1', password => 'enter-ssl-key-password-or-sasl-password); end; /
-
Kafkaクラスタが認証メカニズムとしてGSSAPI/Kerberosを使用する場合:
osakafka.properties
ファイルにリストされているKafkaプリンシパルのデータベース・システムでKerberosチケットを取得します
-
-
SYSDBA
としてログインし、SQLを起動し、SYS.DBMS_KAFKA_ADM.REGISTER_CLUSTER()
プロシージャを使用してKafkaクラスタを登録します。次の例では、KafkaクラスタKAFKACLUS1
が登録されます:select DBMS_KAFKA_ADM.REGISTER_CLUSTER ('KAFKACLUS1', 'mykafkabootstrap-host:9092', DBMS_KAFKA_ADM.KAFKA_PROVIDER_APACHE, 'OSAK_KAFKACLUS1_ACCESS' 'KAFKACLUS1CRED1', 'OSAK_KAFKACLUS1_CONFIG', 'My test cluster kafkaclus1') from dual;
成功すると、0が返されます。たとえば:
SQL> DBMS_KAFKA_ADM_RE….. 0
- Kafkaユーザーに読取りアクセス権を付与します。次の例では、ユーザー
app1-usr
に、KAFKACLUS1
という名前のKafkaクラスタへのアクセス権が付与されます:SQL> grant read on directory OSAK_KAFKACLUS1_ACCESS to
app1-usr
;
親トピック: Oracle SQL Access to Kafka
22.14 Oracle SQL Access to Kafkaクラスタの管理
Oracle SQL Access to Kafkaを使用してKafkaクラスタ定義を更新、一時的に無効化および削除する方法を確認してください
- Kafkaクラスタへのアクセスの更新
Kafkaクラスタ環境が変更された場合、それらの変更のクラスタ定義および構成を更新できます。 - Kafkaクラスタへのアクセスの無効化または削除
Oracle SQL Access to Kafkaクラスタを一時的に無効にしたり、不要になった場合はその接続を削除したりできます。
親トピック: Oracle SQL Access to Kafka
22.14.1 Kafkaクラスタへのアクセスの更新
Kafkaクラスタ環境が変更された場合、それらの変更のクラスタ定義および構成を更新できます。
22.14.2 Kafkaクラスタへのアクセスの無効化または削除
Oracle SQL Access to Kafkaクラスタを一時的に無効にしたり、不要になった場合はその接続を削除したりできます。
例22-11 Kafkaクラスタの無効化
Kafka環境の一時的な停止中に、Kafkaクラスタへのアクセスを一時的に無効にできます
- DBMS_KAFKA_ADM.DISABLE_CLUSTER (後ろに次が続く)
- DBMS_KAFKA_ADM.ENABLE_CLUSTER (Kafka環境がバックアップされている場合)
例22-12 Kafkaクラスタの削除
クラスタ定義が不要になった場合、OSAK管理者はクラスタ定義を削除できます
- DBMS_KAFKA_ADM.DEREGISTER_CLUSTER
22.15 KafkaデータとOracle SQL Access to Kafkaの使用に関するガイドライン
アプリケーション開発計画の一部としてガイドライン、制限および推奨事項を確認します。
- Kafka一時表およびアプリケーション
Oracle SQL Access to Kafkaビューおよび対応する一時表は、一意のKafkaアプリケーション(グループID)にバインドされており、そのアプリケーションにかわってトピック内の1つ以上のパーティションに排他的にアクセスする必要があります。 - ストリーミングを使用した複数のアプリケーションとのKafkaデータの共有
複数のアプリケーションがKafkaデータを使用できるようにするには、Oracle SQL Access to Kafkaを使用して、Kafka表をユーザー表にストリーミングします。 - Kafka表の削除および再作成
KafkaオフセットはDBMS_KAFKA
メタデータ表によって管理されるため、Kafkaトピック構成の変更には、Oracle SQL Access to Kafkaアプリケーションに対する手動更新が必要になる場合があります。
親トピック: Oracle SQL Access to Kafka
22.15.1 Kafka一時表およびアプリケーション
Oracle SQL Access to Kafkaビューおよび対応する一時表は、一意のKafkaアプリケーション(グループID)にバインドされており、そのアプリケーションにかわってトピック内の1つ以上のパーティションに排他的にアクセスする必要があります。
これらのガイドラインを使用して、アプリケーションの制限を支援します。
KafkaグループIDおよびOracle SQL Access to Kafka一時表
標準のOracle表およびビューとは異なり、Apache Kafkaデータの消費に関するルールに従って、Kafka一時表を複数のアプリケーションで共有することはできません。Kafkaデータの場合、各一時表は、特定の時点でKafkaから直接フェッチされるデータのスナップショットであり、Kafkaクラスタを識別する正規名形式、アプリケーション名、ビューID、Kafka内のコンシューマ・グループID (groupID
)に関連付けられたアプリケーションのかわりにクラスタまたはトピック内の1つ以上のパーティションにアクセスする特定のビューを識別する整数があります。Oracle Databaseで作成された一時ビューおよび表は、一意のKafkaアプリケーション(groupID
で識別)にバインドされており、そのアプリケーションのかわりにトピック内の1つ以上のパーティションに排他的にアクセスする必要があります。これらのパーティションへのアクセスを他のアプリケーションと同時に共有することはできません。この制限は、Oracleアプリケーション・インスタンスにまで及びます。Oracle SQL Access to Kafkaビューおよびその関連の一時表は、そのアプリケーション専用である必要があります。同じKafkaトピックまたはパーティション・データを問い合せるよう複数のアプリケーションを構成する場合、これらのアプリケーションがそれ自体を別のアプリケーション(つまり、異なる一意のKafkaグループIDを持つアプリケーション)として識別し、独自のグループIDとアプリケーション・アイデンティティ、および追跡する一連の独自のオフセットが反映された、独自のOracle SQL Access to Kafkaアプリケーションを作成する必要があります。
Oracle SQL Access to Kafkaでビューおよび表を使用するためのガイドライン
アプリケーションのビューおよび表を、そのデータで実行する分析の種類に従って作成します。
アプリケーションで分析にOracle SQLを使用するには、問合せ対象のデータのすべてのパーティションを取得するそのアプリケーションのOracle SQL Access to Kafkaビューを作成することをお薦めします。単一のアプリケーション・インスタンスによるアクセスごとに、トピック内のすべての新しいKafkaデータが取得され、アプリケーションが格納または表示できる集計情報が生成されます。
Oracle SQLを使用して分析を実行せず、かわりにアプリケーション自体で複雑なロジックを使用する場合は、アプリケーション・インスタンスをスケールアウトし、各Oracle SQL Access to Kafkaビューを単一のアプリケーション・インスタンスのかわりに単一のパーティションにアクセスさせることをお薦めします。この場合は通常、Kafkaデータは標準のOracle表に結合され、アプリケーションに返されるデータをエンリッチします。
アプリケーションでより多くの分析が実行される前に一部のSQL分析および結合が実行される場合、トピック内のパーティションの一部のサブセットにビューをマップすることをお薦めします。
22.15.2 ストリーミングを使用した複数のアプリケーションとのKafkaデータの共有
複数のアプリケーションがKafkaデータを使用できるようにするには、Oracle SQL Access to Kafkaを使用してKafka表をユーザー表にストリーミングします。
Kafkaデータを複数のOracleユーザーと共有することで、この表が特定のグループIDに関連付けられないようにするために、アプリケーション・ユーザーに、PL/SQLプロシージャDBMS_KAFKA.EXECUTE_LOAD_APP
を使用してOracle SQL Access to Kafkaをロード・モードで実行させて、そのユーザーが所有する表を作成することをお薦めします。このオプションを使用すると、単一のアプリケーション・インスタンスがPL/SQLのロード・プロシージャを定期的に実行し、すべての新しいデータをKafkaトピックからOracle Database表に増分的にロードします。データが表にロードされた後、一時表に適用される制限なく、その表へのアクセス権を付与された標準のOracle Databaseアプリケーションからアクセス可能にできます。
22.15.3 Kafka表の削除および再作成
KafkaオフセットはDBMS_KAFKA
メタデータ表によって管理されるため、Kafkaトピック構成の変更には、Oracle SQL Access to Kafkaアプリケーションに対する手動更新が必要になる場合があります。
Oracleアプリケーション・インスタンスが、どのKafka表コンテンツが読み取られたか、およびそれがどこで読み取られたかを識別できるようにするには、Kafkaトピックのパーティション・オフセットをアプリケーション・インスタンスごとに追跡する必要があります。
Kafkaでは、オフセットをコミットするための3つのモデルがサポートされています:
- 自動コミット: Kafkaが、短い時間スケジュールでフェッチされた最後のオフセットをコミットします
- 手動コミット: アプリケーションが、オフセットをコミットするリクエストをKafkaに送信します
- アプリケーション管理のコミット: Kafkaのコミットはアプリケーションによって完全に管理されます。
Oracleでは、アプリケーション管理のコミットが使用されます。これらのコミットでは、Kafkaはこれを、Kafkaに明示的にコミットすることなく手動コミットを宣言するアプリケーションとみなします。オフセットは、DBMS_KAFKA
メタデータ表で排他的に記録および保守されます。これらの表は、Oracle DatabaseのACIDトランザクション・プロパティによって保護されます。トランザクションの整合性を保証するために、Oracle SQL Access to Kafkaでは、Kafka自動コミットまたはKafka手動コミットをサポートしていません。
Kafkaトピックを削除して再作成する場合は、シナリオに応じて、その表を手動で更新する必要があります:
例22-13 同じパーティションを持つビューの削除およびリセット
パーティションの数が元のKafkaトピック構成と同じままである場合、ビューをリセットしてOracle SQL Access to Kafkaビューにリセットし、再作成されたトピック内のKafkaパーティションの先頭から処理を開始する必要があります。ビューをリセットするには、プロシージャDBMS_KAFKA.INIT_OFFSET(view_name, 0, 'WML')
をコールします。ここで、view_name
はビューの名前です。
例22-14 パーティションが少ないビューの削除およびリセット
このオプションは使用できません。パーティションの数が元のKafkaトピック構成より少ない場合は、このトピックに関連付けられたOracle SQL Access to Kafkaアプリケーションを削除して再作成する必要があります。
例22-15 パーティションが多いビューの削除およびリセット
パーティションの数が元のKafkaトピック構成より多い場合は、プロシージャDBMS_KAFKA.INIT_OFFSET(view_name, 0, 'WML')
(view_name
はビューの名前)をコールしてOracle SQL Access to Kafkaビューをリセットしてから、このトピックを使用してOracle SQL Access to KafkaアプリケーションごとにプロシージャDBMS_KAFKA.ADD_PARTITIONS
をコールする必要があります。
22.16 アプリケーション用のKafkaクラスタ・アクセス・モードの選択
Oracle SQL Access to Kafkaを使用するには、アプリケーションに必要なデータ・アクセスのモードを決定します。
- Oracle Database表へのKafkaレコードの増分ロードの構成
アプリケーションがKafkaトピックからOracle Database表にデータを増分ロードできるようにするには、Oracle SQL Access to Kafkaをロード・モードで使用します。 - Oracle SQL問合せでのKafkaレコードへのストリーミング・アクセス
Kafkaトピックに、トピックの先頭から、またはKafkaトピックの特定の開始点から順次アクセスするには、Oracle SQL Access to Kafkaをストリーミング・モードで使用します。 - Oracle SQL問合せでのKafkaレコードに対するシーク可能なアクセス
2つのタイムスタンプ間でKafkaレコードにランダムにアクセスするには、Oracle SQL Access to Kafkaをシーク可能モードで使用します
親トピック: Oracle SQL Access to Kafka
22.16.1 Oracle Database表へのKafkaレコードの増分ロードの構成
アプリケーションがKafkaトピックからOracle Database表にデータを増分ロードできるようにするには、Oracle SQL Access to Kafkaをロード・モードで使用します。
EXECUTE_LOAD_APP
プロシージャを使用して増分ロードを実行するようにOracle SQL Access to Kafkaを構成すると、Kafkaデータを標準のOracle表に移動できます。この表には、Oracle SQL Access to Kafka一時表を使用するときに1つのリーダー制約を課さずに複数のアプリケーションからアクセスできます。
KafkaデータをOracle Database表に増分ロードするために、アプリケーションは、PL/SQLプロシージャDBMS_KAFKA.CREATE_LOAD_APP
をコールして、後続のDBMS_KAFKA.EXECUTE_LOAD_APP
コールの状態を初期化することによって、ロード・アプリケーションであることを宣言します。DBMS_KAFKA.CREATE_LOAD_APP
プロシージャは、トピックのすべてのパーティションに対して単一のビューを作成します。
トピック全体のデータが不要な場合は、DBMS_KAFKA.INIT_OFFSET[_TS]
プロシージャをコールしてKafkaデータをロードするためのKafkaトピック・パーティションの開始点を設定するようにアプリケーションを構成することもできます。
DBMS_KAFKA.EXECUTE_LOAD_APP
プロシージャはアプリケーション・ループでコールされ、前のコールが中断した場所からKafkaトピックの現在の高水位標にデータをロードします。この手順は自律型トランザクションで実行されます。
KafkaトピックからOracle Database表にデータをロードするには:
- DBMS_KAFKA.CREATE_LOAD_APPを使用して、Oracle SQL Access to Kafkaロード・アプリケーションを作成します
- オプションで、DBMS_KAFFA_INIT_OFFSET_TSまたはDBMS_KAFKA_INIT_OFFSETを使用して、最初に読み取られるKafkaレコードを設定します
- 完了までLOOPします
- DBMS_KAFKA.EXECUTE_LOAD_APPを使用して、中断した場所から現在の最高水位標までのKafkaデータをロードします。
- DBMS_KAFKA.DROP_LOAD_APPを使用して、ロード・アプリケーションを削除します
22.16.2 Oracle SQL問合せでのKafkaレコードへのストリーミング・アクセス
Kafkaトピックに、トピックの先頭から、またはKafkaトピックの特定の開始点から順次アクセスするには、Oracle SQL Access to Kafkaをストリーミング・モードで使用します。
アプリケーションでKafkaトピックに順次アクセスする必要がある場合、Oracle SQL Access to Kafkaをストリーミング・モードで構成します。このモードでは、Oracle SQL Access to Kafka一時表を使用したSQL問合せで、アプリケーション処理ループ内でKafkaレコードに順次アクセスできます。このユースケースでは、アプリケーションは、PL/SQLプロシージャDBMS_KAFKA.CREATE_STREAMING_APP
をコールして、KafkaビューへのOracle SQLアクセスの後続の問合せの状態を初期化することにより、ストリーミング・アプリケーションであることを宣言します。この手順では、ビューの作成に加えて、ビューごとにグローバル一時表も作成します。また、INIT_OFFSET[_TS]プロシージャを使用して、アプリケーションのKafkaトピック・パーティションの開始点を設定することもできます。開始点として設定すると、最初の問合せが開始点からKafkaデータを読み取ります。これにより、アプリケーションが処理ループで次のステップを実行できるようになります:
DBMS_KAFKA.CREATE_STREAMING_APP
をコールして、Oracle SQL Access to Kafkaストリーミング・アプリケーションを作成します。- (オプション)
DBMS_KAFFA_INIT_OFFSET_TS
またはDBMS_KAFKA_INIT_OFFSET
をコールして、最初に読み取るKafkaレコードを設定します。 - 完了までLOOPします:
- DBMS_KAFKA.LOAD_TEMP_TABLEをコールして、Kafkaから次の行セットを含むグローバル一時表をロードします
- 取得されたプロセス・データをOSAKグローバル一時表からSELECTします
- 処理が成功した場合は、
DBMS_KAFKA.UPDATE_OFFSET
をコールして、最後に読み取られたKafkaオフセットを更新します COMMIT
を使用してオフセット追跡情報をコミットします。
- 終了したら、DBMS_KAFKA.DROP_STREAMING_APPをコールしてアプリケーションを削除します。
PL/SQLプロシージャDBMS_KAFKA.UPDATE_OFFSET
は、Oracle SQLアクセスで識別されるすべてのパーティションについて、KafkaグループIDのKafkaパーティション・オフセットをKafkaビューに透過的に進めます。これにより、DBMS_KAFKA.LOAD_TEMP_TABLE
のすべてのコールで、一連の新しい未読のKafkaレコードが取得され、処理されます
トランザクションがまだ開始されていない場合、UPDATE_OFFSET
はOracleトランザクションを開始し、メタデータ表に最後のオフセットを記録することに注意してください。このため、トランザクションがセッション情報を失わないようにするには、UPDATE_OFFSET
をコールするたびにトランザクションをコミットするようにアプリケーションを構成する必要があります。トランザクションをコミットした後、Oracle SQL Access to KafkaによってOracleトランザクション内のオフセットが管理されるため、レコードは失われることも、再度読み取られることもありません。トランザクションが完了しなかった場合、オフセットは進められません。アプリケーションは、データの読取りを再開する場合、以前の読取りを停止した場所からKafkaデータの読取りを再開できます。
22.16.3 Oracle SQL問合せでのKafkaレコードに対するシーク可能なアクセス
2つのタイムスタンプ間でKafkaレコードにランダムにアクセスするには、Oracle SQL Access to Kafkaをシーク可能モードで使用します
Oracle SQL Access to Kafkaのシーク可能モードでは、アプリケーションは、(通常は、ストリーミング・アクセスを行うピア・アプリケーションによって識別される)目的のタイムスタンプ間でKafkaレコードを読み取ることができます。このモードでは、DBMS_KAFKA.LOAD_TEMP_TABLE
プロシージャが一時表に移入する時間範囲を定義する開始タイムスタンプと終了タイムスタンプを指定します。アプリケーションは、シーク可能モードでKafkaにアクセスするための状態を初期化するためにPL/SQLプロシージャDBMS_KAFKA.CREATE_SEEKABLE_APP
をコールして、シーク可能アプリケーションであることを宣言します。この手順では、トピックのすべてのパーティションにわたってビューおよび対応するグローバル一時表を作成します。DBMS_KAFKA.SEEK_OFFSET_TS
プロシージャがコールされ、問合せ元の時間範囲が指定されます。アプリケーションは、DBMS_KAFKA.LOAD_TEMP_TABLE
プロシージャをコールする前にSEEK_OFFSET_TS
をコールして、次の行セットを含む一時表をロードします。
2つのタイムスタンプ間のKafkaレコードにアクセスするために「シーク可能」モードでKafkaデータを問い合せるには
- DBMS_KAFKA.CREATE_SEEKABLE_APPを使用して、Oracle SQL Access to Kafkaシーク可能アプリケーションを作成します
- 完了までLOOPします
- DBMS_KAFKA.SEEK_OFFSET_TSを使用して、Kafkaトピック内のユーザー定義時間範囲を検索します
- DBMS_KAFKA.LOAD_TEMP_TABLEをコールして、Kafkaから行セットを含むグローバル一時表をロードします
- OSAKグローバル一時表からSELECTします
- データを処理します
- アプリケーションで実行した場合、DBMS_KAFKA.DROP_SEEKABLE_APPを使用します
22.17 Oracle SQL Access to Kafkaアプリケーションの作成
LOADアプリケーションでKafkaデータを問い合せるには、次のプロシージャを使用してKafkaデータをOracle Database表にロードします。
ロード・プロシージャの一般的な用途は次のとおりです:
DBMS_KAFKA.CREATE_LOAD_APP
: このプロシージャは、Oracle表へのロードの設定に使用されます
DBMS_KAFKA.INIT_OFFSET[_TS] (OPTIONAL)
: このプロシージャは、一連のロード操作の開始点を制御するために、すべてのトピック・パーティションでオフセットを設定するために使用されます。このプロシージャは、プロシージャを実行するKafkaトピックから新しい行をロードする必要がなくなるまで繰り返します。
DBMS_KAFKA.EXECUTE_LOAD_APP
: このプロシージャは、Kafkaトピックからすべてのトピック・パーティションの高水位標に新しい未読レコードをロードするために使用されます
DBMS_KAFKA. DROP_LOAD_APP
: このプロシージャは、プロシージャを実行しているKafkaトピックからのロードが完了したときに使用します。
- Oracle SQL Access to Kafkaを使用したロード・アプリケーションの作成
Oracle Database表にデータをロードする場合は、ロード・モードDBMS_KAFKA
を使用します。 - Oracle SQL Access to Kafkaを使用したストリーミング・アプリケーションの作成
Oracle Database表にデータをロードする場合は、ロード・モードDBMS_KAFKA
を使用します。 - Oracle SQL Access to Kafkaを使用したシーク可能なアプリケーションの作成
過去に発生した問題を調査し、開始タイムスタンプと終了タイムスタンプの間のKafkaトピックにランダムにアクセスする場合は、シーク可能モードのDBMS_KAFKA
を使用します。
親トピック: Oracle SQL Access to Kafka
22.17.1 Oracle SQL Access to Kafkaを使用したロード・アプリケーションの作成
Oracle Database表にデータをロードする場合は、ロード・モードDBMS_KAFKA
を使用します。
LOAD
アプリケーションに対してDBMS_KAFKA.EXECUTE_LOAD_APP
をコールできるのは、1つのアプリケーション・インスタンスのみです。
例22-16 DBMS_KAFKA.EXECUTE_LOAD_APPを使用した表へのデータのロード
この例では、ロード・アプリケーション用に1つのビューおよび関連する一時表を作成します。Kafkaクラスタ名はExampleCluster、アプリケーション名はExampleAppです。Kafkaトピックは、4つのパーティションを持つトピックであるExampleTopicです:
DECLARE
v_options VARCHAR2;
BEGIN
v_options := ‘{“fmt” : “DSV”, “reftable” : “user_reftable_name”}’;
SYS.DBMS_KAFKA.CREATE_LOAD_APP (
‘ExampleCluster’,
‘ExampleApp’,
‘ExampleTopic’,
v_options);
END;
/
例22-17 DBMS_KAFKA.EXECUTE_LOAD_APPを使用した表へのデータの定期的なロード
一連のアプリケーション・ビューからKafkaデータを処理するかわりに、最新のデータを定期的に表にフェッチし、単にKafkaからOracle Database表にデータをロードすることを選択することもできます。この例のDBMS_KAFKA.EXECUTE_LOAD_APP
プロシージャは、Kafkaクラスタから最新データを取得し、このデータを表ExampleLoadTable
に挿入します。この表のデータを使用するアプリケーションには、DBMS_KAFKA.INIT_OFFSET[_TS]
をコールしてロードの開始点を設定するオプションがあります。
DECLARE
v_records_inserted INTEGER;
BEGIN
SYS.DBMS_KAFKA.EXECUTE_LOAD_APP (
‘ExampleCluster’,
‘ExampleLoadApp’,
‘ExampleLoadTable’,
v_records_inserted);
END;
例22-18 DBMS_KAFKA.DROP_LOAD_APPまたはDBMS_KAFKA.DROP_ALL_APPSを使用したKafkaビューおよびメタデータの削除
Oracle SQL Access to Kafkaロード・アプリケーションが不要になった場合は、DBMS_KAFKA.DROP_LOAD_APP
をコールしてビューおよびメタデータを削除できます。次の例では、KafkaクラスタはExampleCluster
、アプリケーションはExampleApp
です。
EXEC SYS.DBMS_KAFKA.DROP_LOAD_APP
(‘ExampleCluster’, ‘ExampleApp’);
Oracle SQL Access to Kafkaアプリケーションへの1つ以上のOracle SQLアクセスが存在しなくなった場合は、DBMS_KAFKA.DROP_ALL_APPS
をコールして、特定のクラスタのすべてのアプリケーションを削除できます
EXEC SYS.DBMS_KAFKA.DROP_ALL_APPS
(‘ExampleCluster’);
22.17.2 Oracle SQL Access to Kafkaを使用したストリーミング・アプリケーションの作成
Oracle Database表にデータをロードする場合は、ロード・モードDBMS_KAFKA
を使用します。
ストリーミングを使用すると、データを大規模に処理できます。Oracle SQL Access to Kafkaをストリーミング・モードで使用して、複数のアプリケーション・インスタンスを作成できます。複数のインスタンスにより、アプリケーションは、1つ以上のスレッド、プロセスまたはシステムで同時に実行されているアプリケーション・インスタンス間でKafkaデータを分析するワークロードをスケール・アウトおよび分割できます。
Oracle SQL Access to Kafkaストリーミング・アプリケーションには、Kafkaグローバル一時表への一連の専用Oracle SQLアクセスと、KafkaビューへのOracle SQLアクセスが含まれます。これらの一時表およびビューは、Kafkaトピック内のパーティションから新しい未読レコードを取得するために使用できます。
また、KafkaクラスタでアクティブなトピックおよびKafkaトピックに関するパーティション情報を調べるために使用されるメタデータ・ビューも作成します(存在しない場合)。このビューは一度作成されたら、同じクラスタを共有するすべてのアプリケーションに対応します。
各Oracle SQL Access to Kafkaグローバル一時表およびその関連ビューは、Oracle SQL Access to Kafkaアプリケーションの1つのインスタンスによって排他的に使用されます。
各アプリケーション・インスタンスはLOAD_TEMP_TABLE
をコールします。これにより、専用のOracle SQL Access to Kafkaグローバル一時表に、関連付けられたビューから取得されたKafka行が移入されます。これにより、アプリケーションは、Oracle SQL Access to Kafkaグローバル一時表のコンテンツに対して1つ以上のSQL問合せを実行できるようになります。アプリケーションが現在の一連のKafka行で実行されると、UPDATE_OFFSET
およびCOMMIT
をコールします。
STREAMING
モード・アプリケーションは、アプリケーション用に必要なOracle SQL Access to Kafkaビューおよび一時表の数を選択するようアプリケーションを構成できるという点で、LOAD
またはSEEKING
アプリケーションとは異なります。Oracle SQL Access to Kafkaアプリケーションの他のタイプと同様、各アプリケーション・インスタンスは、1つの一意のOracle SQL Access to Kafka一時表を排他的に問い合せます。各1つの一意のビューおよびグローバル一時表名には、クラスタ名、アプリケーション名およびアプリケーション・インスタンス識別子(ID)が含まれます。
アプリケーションを作成する際には、作成する1つの一意のビューおよび一時表のペアの数が1からNの間であることに注意してください。ここで、NはKafkaトピック内のパーティションの数です。
実行時に、各アプリケーション・インスタンスは独自のユーザー・セッションで実行され、1つの一意のグローバル一時表およびその関連ビューを処理します。したがって、アプリケーション・インスタンスを同時に実行するには、少なくともKafkaトピック内のパーティションと同じ数(つまり、N
の値)のセッションをユーザーに割り当てる必要があります。view_count
がユーザー当たりの最大セッション数を超えると、このコールは失敗し、ユーザーに割り当てられているセッションが不足していることを示すエラーが表示されます。特定のOracle SQL Access to Kafkaビューおよびその関連グローバル一時表にバインドされたKafkaパーティションの数は、作成されたビューの数および存在するパーティションの数によって異なります。Oracle SQL Access to Kafkaは、各ビューに割り当てられるパーティションの数を均等に分散します。
例22-19 DBMS_KAFKA.CREATE_STREAMING_APPを使用した表へのデータのストリーミング
この例では、ExampleTopic
という名前のトピックのデータを使用して、ストリーミング・モード・アプリケーションの一連の4つのビューおよび関連する一時表を作成します。このトピックには4つのパーティションがあり、各ビューおよび一時表は1つのパーティションに関連付けられています:
DECLARE
v_options VARCHAR2;
BEGIN
v_options := ‘{“fmt” : “DSV”, “reftable” : “user_reftable_name”}’;
SYS.DBMS_KAFKA.CREATE_STREAMING_APP (
‘ExampleCluster’,
‘ExampleApp’,
‘ExampleTopic’,
v_options,
4);
END;
/
例22-20 DBMS_KAFKA.CREATE_STREAMING_APPを使用した単一の表へのデータのロード
この例では、ストリーミング・モードを使用して、トピックの4つのパーティションすべてに関連付けられているアプリケーションに対して1つのビューと関連する一時表を作成します:
DECLARE
v_options VARCHAR2;
BEGIN
v_options := ‘{“fmt” : “DSV”, “reftable” : “user_reftable_name”}’;
SYS.DBMS_KAFKA.CREATE_STREAMING_APP (
‘ExampleCluster’,
‘ExampleApp’,
‘ExampleTopic’,
v_options,
1);
END;
/
例22-21 DBMS_KAFKA.DROP_STREAMING_APPまたはDBMS_KAFKA.DROP_ALL_APPSを使用したKafkaビューおよびメタデータの削除
Oracle SQL Access to Kafkaロード・アプリケーションが不要になった場合は、DBMS_KAFKA.DROP_STREAMING_APP
をコールしてビューおよびメタデータを削除できます。次の例では、KafkaクラスタはExampleCluster
、アプリケーションはExampleApp
です。
EXEC SYS.DBMS_KAFKA.DROP_STREAMING_APP
(‘ExampleCluster’, ‘ExampleApp’);
Oracle SQL Access to Kafkaアプリケーションへの1つ以上のOracle SQLアクセスが存在しなくなった場合は、DBMS_KAFKA.DROP_ALL_APPS
をコールして、特定のクラスタのすべてのアプリケーションを削除できます
EXEC SYS.DBMS_KAFKA.DROP_ALL_APPS
(‘ExampleCluster’);
22.17.3 Oracle SQL Access to Kafkaを使用したシーク可能アプリケーションの作成
過去に発生した問題を調査し、開始タイムスタンプと終了タイムスタンプの間のKafkaトピックにランダムにアクセスする場合は、シーク可能モードのDBMS_KAFKA
を使用します。
シーク可能モードでKafkaトピックにアクセスする前に、DBMS_KAFKA.CREATE_SEEKABLE_APP
を使用してOracle SQL Access to Kafkaアプリケーションを作成する必要があります。このパッケージは、シーク可能モードで使用できるアプリケーションを作成します。
Oracle SQL Access to Kafkaをシーク可能モードで使用すると、Kafkaデータを使用して、過去に発生した問題を調査できます。データがまだKafkaストリーム内に存在する場合は、DBMS_KAFKA.CREATE_SEEKABLE_APP
をコールしてシーク可能アプリケーションを作成できます。シーク可能モード・アプリケーションを作成したら、プロシージャDBMS_KAFKA.SEEK_OFFSET_TS
をコールして、データ・レコードの範囲を取得するようOracle SQL Access to Kafkaビューにリクエストできます。たとえば、生産の問題が午前3時頃に発生し、原因を調査する必要があることをITコンサルタントに知らせたとします。コンサルタントは次のプロシージャを使用して、一時表をロードしてから、その時間の1時間分に相当するデータを取得することを選択できます:
アプリケーションを作成する際には、作成する1つの一意のビューおよび一時表のペアの数が1からNの間であることに注意してください。ここで、NはKafkaトピック内のパーティションの数です。
実行時に、各アプリケーション・インスタンスは独自のユーザー・セッションで実行され、1つの一意のグローバル一時表およびその関連ビューを処理します。したがって、アプリケーション・インスタンスを同時に実行するには、少なくともKafkaトピック内のパーティションと同じ数(つまり、N
の値)のセッションをユーザーに割り当てる必要があります。view_count
がユーザー当たりの最大セッション数を超えると、このコールは失敗し、ユーザーに割り当てられているセッションが不足していることを示すエラーが表示されます。特定のOracle SQL Access to Kafkaビューおよびその関連グローバル一時表にバインドされたKafkaパーティションの数は、作成されたビューの数および存在するパーティションの数によって異なります。Oracle SQL Access to Kafkaは、各ビューに割り当てられるパーティションの数を均等に分散します。
例22-22 DBMS_KAFKA.CREATE_SEEKABLE_APPを使用したKafkaデータの日付範囲の検索
この例では、生産の問題が午前3時頃に発生し、原因を調査する必要があることをITコンサルタントに知らせたとします。コンサルタントは、コンサルタントは次のプロシージャを使用して、一時表をロードしてから、その時間の1時間分に相当するデータを取得することを選択できます。ここで、KafkaクラスタはEXAMPLECLUSTER
、列はEventCol
およびExceptionCol
です:
SYS.DBMS_KAFKA.SEEK_OFFSET_TS (
‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
TO_DATE (‘2022/07/04 02:30:00’, ‘YYYY/MM/DD HH:MI:SS’,
TO_DATE (‘2022/07/04 03:30:00’, ‘YYYY/MM/DD HH:MI:SS’));
SYS.DBMS_KAFKA.LOAD_TEMP_TABLE
(ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0);
SELECT EventCol, ExceptionCol FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;
例22-23 DBMS_KAFKA.CREATE_SEEKABLE_APPを使用した異常に関連するレコードの検索
Kafkaストリームへの順次アクセスを使用するアプリケーションが潜在的な異常を検出し、異常表に行を挿入するとします。異常表には、Kafkaタイムスタンプと、トレースに重要として指定されたその他のデータが含まれます。別のアプリケーションは、この情報を使用して、疑われるレコード周辺のレコードを取得し、この異常に関連する他の問題があるかどうかを確認できます。この例では、ITコンサルタントが調べる異常に関連する列は、UserCol
およびReqeustCol
です。これを実現するには、次のプロシージャを実行して一時表をロードし、アプリケーション・ロジックを選択して結果に適用します:
SYS.DBMS_KAFKA.SEEK_OFFSET_TS (
‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
TO_DATE (‘2020/07/04 02:30:00’, ‘YYYY/MM/DD HH:MI:SS’,
TO_DATE (‘2020/07/04 03:30:00’, ‘YYYY/MM/DD HH:MI:SS’));
SYS.DBMS_KAFKA.LOAD_TEMP_TABLE
(ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0);
SELECT UserCol, RequestCol FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;
--application logic
例22-24 DBMS_KAFKA.DROP_SEEKABLE_APPまたはDBMS_KAFKA.DROP_ALL_APPSを使用したKafkaビューおよびメタデータの削除
Oracle SQL Access to Kafkaロード・アプリケーションが不要になった場合は、DBMS_KAFKA.DROP_SEEKABLE_APP
をコールしてビューおよびメタデータを削除できます。次の例では、KafkaクラスタはExampleCluster
、アプリケーションはExampleApp
です。
EXEC SYS.DBMS_KAFKA.DROP_SEEKABLE_APP
(‘ExampleCluster’, ‘ExampleApp’);
Oracle SQL Access to Kafkaアプリケーションへの1つ以上のOracle SQLアクセスが存在しなくなった場合は、DBMS_KAFKA.DROP_ALL_APPS
をコールして、特定のクラスタのすべてのアプリケーションを削除できます
EXEC SYS.DBMS_KAFKA.DROP_ALL_APPS
(‘ExampleCluster’);
22.18 アプリケーションでのKafkaクラスタ・アクセスの使用
アプリケーションでのKafkaクラスタ・データ・アクセスの使用方法を学習します。
- Oracle SQL Access to Kafkaの問題を診断する方法
Oracle SQL Access to Kafkaで問題が発生した場合は、次のガイドラインを使用して原因を特定し、問題を解決します。 - Oracle SQL Access to Kafkaの問題の識別および解決
問題の識別および解決を支援するために、Oracle SQL Access to Kafkaには、クラスタ表にトレース・ファイル、メッセージ表、操作結果表および状態列があります。
親トピック: Oracle SQL Access to Kafka
22.18.1 Oracle SQL Access to Kafkaの問題を診断する方法
Oracle SQL Access to Kafkaで問題が発生した場合は、次のガイドラインを使用して原因を特定し、問題を解決します。
Oracle SQL Access to Kafkaの主な診断問題を次に示します:
初期接続の確立の失敗
このタイプのエラーは次のとおりです:
- 不正な起動サーバー・リスト
- 不正な資格証明情報
- ネットワーキング構成の問題
初回アクセス時の失敗
DBMS_KAFKA CREATE_LOAD_APP
、CREATE_STREAMING_APP
またはCREATE_SEEKABLE_APP
をコールしたときの初回アクセスの失敗には通常、次の原因があります:
- 欠落しているか不正なトピック
- 接続の問題
レコード更新中の失敗
通常、このタイプの失敗には次の原因があります:- 接続の問題
- 内部メタデータまたはロジックの問題
- レコードの欠落
- Oracle SQL Access to Kafkaビューの形状が入力と一致しない解析エラー。
OracleアプリケーションおよびOracle SQL Access to KafkaがKafkaデータ入力に対応する際の失敗。
このタイプの失敗にはリソースのチューニングが必要です。これらは、Kafkaクラスタ内のトピックへの行の取込み率が、Oracle DatabaseでKafkaレコードを消費するOracle Databaseの能力に近いかそれを超えている場合に発生します。たとえば、一定時間後、Kafkaの未読レコードがOracle Databaseによって消費される前に期限が来たためにKafkaによって無効にされます。
この種のエラーは、ワークロードを決定することで回避または修正します。たとえば、問合せの頻度、Oracle SQL Access to KafkaビューへのOracle SQLアクセスごとの問合せ当たりに処理された典型的なレコード数、使用されている並列度、および分析を実行するアプリケーションで費やされた時間を確認します。ワークロードを決定したら、アプリケーション・スタックがワークロードを満たせることを確認します。アプリケーションおよびOracle DatabaseがアプリケーションまたはOracle Databaseリソースを制限することなく、ピークのKafkaレコードを処理できるように、リソースのサイズを設定します。
スループット・レートが上昇し始めたことがわかると、いくつかのことが役立ちます。たとえば、アプリケーション・ユーザーの並列度を増やしたり、より多くのアプリケーション・インスタンスを起動したり、Kafkaクラスタにパーティションを追加します。
例22-25 Oracle SQL Access to Kafka (OSAK)アプリケーション・エラーの解決
OSAKアプリケーションEXAMPLEAPP
がKafkaクラスタEXAMPLECLUSTER
からデータをロードしており、次のようなエラーが表示されるとします:
ORA-62721: The specified parallel hint [%0!s] exceeds the granule count {%1!s}.
このエラーの原因は、指定された値が、グラニュル数によって決定される可能な最大並列度を超えていたことです。このようなエラーはどのように解決しますか。
LOAD_TEMP_TABLE
およびEXECUTE_LOAD_APP
のparallel_hint
パラメータは、特定のSELECT文でデータをフェッチするために実行できるパラレル・プロセスの数を決定する並列度(またはDOP)に関連しています。パラレル問合せを最大限に活用するには、parallel_hint
パラメータを2から最大許容DOPの間に設定する必要があります。最大DOPは、コールを行うユーザーに許可される最大値、またはOSAKビューに関連付けられたパーティション数のいずれか小さい方です。この原因は、アプリケーションを実行しているデータベースまたはユーザー・アカウントのいずれかが最大許容DOPを超えていることです。
この問題を解決するには、グラニュル数以下の値を指定します。グラニュル数は、DBMS_KAFKA.GET_GRANULE_COUNT
関数をコールすることで確認できます:
DECLARE
v_dop INTEGER;
BEGIN
LOOP
v_dop := SYS.DBMS_KAFKA.GET_GRANULE_COUNT(‘ORA$DKVGTT_EXAMPLECLUSTER_EXAMPLEAPP_0’);
SYS.DBMS_KAFKA.LOAD_TEMP_TABLE(‘ORA$DKVGTT_EXAMPLECLUSTER_EXAMPLEAPP_0’);
FOR kafka_record IN (
SELECT kafka_offset offset
FROM ORA$DKVGTT_EXAMPLECLUSTER_EXAMPLEAPP_0)
LOOP
SYS.DBMS_OUTPUT.PUT_LINE (‘Processing record: ‘ || kafka_record.offset);
--application logic to process the Kafka records
END LOOP;
IF (application logic was successful) THEN
--Update internal metadata to confirm Kafka records were successfully processed
SYS.DBMS_KAFKA.UPDATE_OFFSET(‘ORA$DKV_EXAMPLECLUSTER_EXAMPLEAPP_0’);
COMMIT;
ELSE
--add your application logic to correct for any failures
END IF;
END LOOP;
END;
親トピック: アプリケーションでのKafkaクラスタ・アクセスの使用
22.18.2 Oracle SQL Access to Kafkaの問題の識別および解決
問題の識別および解決を支援するために、Oracle SQL Access to Kafkaには、クラスタ表にトレース・ファイル、メッセージ表、操作結果表および状態列があります。
表示される問題の性質を確認し、使用可能なユーティリティを使用して問題を特定して対処します:
- 接続の問題、ロジックの問題またはKafkaアクセス・レイヤー(Kafkaデータ選択によってコールされるOracle実行可能ファイル): トレース・ファイルを確認します。また、
sys.user_kafka_clusters
表の状態列を確認できます。 - DBMS_KAFKAおよびDBMS_KAFKA_ADM APIからの例外:
sys.user_kafka_messages
表のエラー・メッセージを確認します。 - 操作ランタイムの問題: Oracle SQL Access to Kafkaデータ取得のパフォーマンスが予想どおりでない場合、
sys.user_kafka_ops_results
表のメッセージを確認します。
例22-26 接続の問題、ロジックの問題またはKafkaアクセス・レイヤーの問題
トレース・ファイルを使用して問題を識別します。
-
接続関連の問題の場合、詳細はビュー・オブジェクト・トレースから入手できます。有効にするには、イベントを
init.ora
ファイルに追加するか、alter systemコマンドを使用して実行時にシステムを更新します。次のエントリを初期化ファイル(
init.ora
)に追加します。event='trace[KGRK] disk highest'
システムを変更します:
alter system set events 'trace[KGRK] disk highest';
ノート:
init.ora
ファイルの更新を有効にするには、データベースの再起動が必要です。 -
ロジック関連のエラーの場合、すべてのエラー・パスにトレースが含まれます。すべてのメッセージの先頭には文字列
kubsCRK
が付きます。これらのロジック・エラーによっても、SQL例外が発生します。 -
Oracle SQL Access to KafkaアプリケーションのKafkaアクセス・レイヤーのトレース出力は、
TRUE
として渡されたenable引数を使用してDBMS_KAFKA.SET_TRACING
をコールすることで有効になります。トレース出力は、FALSE
として渡されたenable引数を使用して同じ関数をコールすることによって無効になります。たとえば:
アプリケーションが
ExampleApp
である状態で、ExampleCluster
という名前のクラスタのトレースを有効にするには、次のように入力します:DBMS_KAFKA.SET_TRACING('ExampleCluster', 'ExampleApp', true)
そのクラスタのトレースを無効にするには、次のように入力します:
DBMS_KAFKA.SET_TRACING('ExampleCluster', 'ExampleApp', false)
ノート:
トレースを有効にするには、データベースに対して次のイベントがすでに有効になっている必要があります:
event="39431 trace name context forever, level 1" # Enable external table debug tracing
問題が接続の問題であると判断した場合は、sys.user_kafka_clusters
表の状態列を確認します。接続レベルは数値で指定されます:
- CONNECTED (0): この状態は、Kafkaクラスタへの接続が確立されていることを示します。接続の確立中に発生したエラーは、Kafkaデータのリクエストに関する問題を示します。問題を識別するには、
DBMS_KAFKA.SET_TRACING
APIを使用してトレースを有効にし、問題を再現してから、関連するトレース・ファイルでセッションに「kubsCRK
」が含まれるメッセージがないか確認します。user_kafka_messages
表のメッセージも確認します。 - MAINTENANCE (1): この状態は、Kafkaクラスタへの接続が確立されていることを示しますが、接続の確立中に発生するエラーは、Kafkaデータのリクエストに関する問題を示します。この問題を解決するには、
DBMS_KAFKA.SET_TRACING
APIを使用してトレースを有効にし、問題を再現してから、関連するトレース・ファイルでセッションにkubsCRK
が含まれるメッセージがないか確認します。user_kafka_messages
表のメッセージも確認します。 - BROKEN (2): この状態は、Kafkaクラスタへの接続を再確立できないことを示します。トレース・ファイルの機能KUBDのエラーおよびメッセージ表のエラーを探します。
- DEREGISTERED (3): この状態は、OSAK管理者がクラスタの登録解除を強制し、関連するOracle SQL Access to Kafkaビューにアクセスする必要がないことを示します。これは予期された動作であり、エラーではありません。
例22-27 PL/SQLパッケージの問題
Sys.user_kafka_messages table
を確認します。この表には、過去3日間に記録されたメッセージが含まれます。古いデータは1日に1回自動的にパージされます。データに関連付けられているOSAKビューが削除された場合も、メッセージは削除されます。
例22-28 操作ランタイムの問題
SELECT
文を使用して取得された行数が予想より少ないと思われる場合は、sys.user_kafka_ops_results
表のデータを使用して、最後の選択時にKafkaから読み取られたレコード数を確認します。
SELECT
には正しく解析された行のみが含まれるため、取得された行と読み取られたKafkaレコードの差は、Kafkaトピック内のすべてのデータがDBMS_KAFKA CREATE_LOAD_APP
、CREATE_STREAMING_APP
またはCREATE_SEEKABLE_APP
コール中に指定された形式ではないことを示しています。
Kafkaトピック・データが指定された形式でない場合、回答は次のとおりです:
- Kafkaクラスタに公開しているプロデューサを修正します:
- アプリケーションを削除して再作成し、適切な形式(DSVの参照表、AVROのAVROスキーマ)を提供するようにします。
- JSONデータの場合は、アプリケーションを削除して再作成する前に、データが
VARCHAR2 VALUE
列の最大列長を超えているかどうかを確認します。データが最大値を超える場合は、アプリケーションを削除して再作成できますが、今回はオプション・パラメータにオプション"jsond" : "clob"
を追加します。このオプションを使用すると、OSAKは、列をデフォルトの最大サイズVARCHAR2
ではなく、キャラクタ・ラージ・オブジェクト(CLOB)列として作成できます。
親トピック: アプリケーションでのKafkaクラスタ・アクセスの使用