21 Oracle SQL Access to Kafka

Oracle Database 23c以降、Oracle SQL APIを使用して、Oracle SQLを使用してKafkaトピックを動的に問い合せることができます。

Oracle SQL Access to Kafkaは、いくつかの重要な方法で、KafkaおよびOCI Streaming ServiceストリームをOracle Database 23cと統合します。最初に、Oracle Databaseを1つ以上のKafkaトピックに接続できます。データベースが接続された後、Oracle DatabaseでKafkaデータを永続化することなく、Oracle SQLを使用してそのトピックを動的に問い合せることができます。この機能を使用すると、Oracle Databaseで取得されたデータと組み合せてリアルタイム・データを分析できます。また、Oracle SQL Access to Kafkaにより、KafkaトピックをOracle Databaseに損失なく迅速かつスケーラブルにロードできます。DBMS_KAFKA APIは、このプロセス全体の管理を簡略化します。

21.1 Oracle SQL Access to Kafkaバージョン2について

Oracle SQL Access to Kafka (OSaK)には、Oracle SQLからKafkaトピックへの問合せを実行できるOracle Databaseのネイティブ機能が用意されています。

Oracle Database 23c以降では、Oracle SQL Access to Kafkaのバージョン2がOracle Databaseとともにインストールされます。これは、KafkaクラスタにネイティブのOracle Databaseコネクタ・サービスを提供します。これは、DBMS_KAFKAおよびDBMS_KAFKA_ADMパッケージを介してアクセスされる一連の機能で構成されています。

機能

Oracle SQL Access to Kafkaバージョン2では、標準のOracle Database SQLセマンティクスを使用してKafkaストリーミング・データをOracle Database表とともに処理でき、標準のOracleアプリケーション・ロジック(Oracle JDBCアプリケーションなど)によってデータを処理できます。Oracle SQL Access to Kafkaは、Oracle Databaseに統合されています。このOracle DatabaseでのKafkaアクセスの統合により、外部クライアント・コネクタ・アプリケーションを必要とせずに、KafkaまたはOCI Streaming Serviceによって生成されたデータ・ストリームを使用して、Oracle Database内の表を関連付けることができます。Oracle SQL Access to Kafkaは、Kafkaアプリケーションと同じ方法でOracle Databaseのデータ・ストリームをスケールアップできます。

Oracle SQL Access to Kafkaを使用すると、次を実行できます:

  • ストリーミング・アプリケーションを作成して使用し、未読のKafkaレコードを1回処理します。これらのレコードは、処理後に保持する必要がありません。
  • ロード・アプリケーションを作成して使用し、様々なOracle Applicationsによるアクセスのために、未読のKafkaレコードをOracle Database表に永続的に取得します。この場合、Kafkaレコードは取得され、Oracle Databaseのユーザー表に保持されます。このユースケースは、データ・ウェアハウスに役立ちます。
  • ユーザーが指定したタイムスタンプ間隔に基づいて、シーク・アプリケーションを作成して使用し、Kafkaトピック内のレコードを再度読み取ります。
  • 2つ以上のストリーミング・アプリケーションを作成して使用します。これらのアプリケーションは、2つ以上のKafkaトピックからデータをストリーミングするために使用でき、そこでOracle DatabaseのSQLを使用してそれらを結合できます。

仕組み

Oracle SQL Access to Kafkaバージョン2では、Oracle Databaseシステム生成ビューおよび外部表を使用してKafkaデータにアクセスできます。これらのビューおよび外部表は、DBMS_KAFKAパッケージを使用して、Kafkaアプリケーションへの名前付きOracle SQLアクセスを定義します。通常、これらのビューおよび外部表は、アプリケーションのストリーミング、ロードおよびシークに対して透過的です。

アプリケーションは、操作をOracle Databaseトランザクションとして実行および制御し、データベースのACID (原子性、一貫性、独立性、耐久性)要件に準拠して、トランザクションごとに一意の識別子(トランザクションID)を使用してトランザクションのすべての部分がコミットされるか、すべてがロールバックされるようにします。このトランザクションIDには、エラーの識別およびロールバックに使用できるタイムスタンプが含まれます。Oracle DatabaseトランザクションのACID機能は、障害が発生した場合にレコードを失ったり繰り返したりすることなく、データ・リカバリをサポートします。

Oracle SQL Access to Kafkaで実行されるOracleトランザクションには、Kafkaパーティション・オフセットの管理、およびOracle Databaseのデータベース・メタデータ表へのコミットが含まれます。

Oracle SQL Access to Kafkaがない場合、Kafkaパーティション・オフセットは、アプリケーションまたはKafkaのいずれかで管理する必要がありますが、いずれもトランザクション・セマンティクスをサポートしていません。つまり、システム障害の後、Kafkaレコードが失われるか、アプリケーションによって再処理される可能性があります。Oracle Databaseトランザクションでオフセットを管理すると、これらの問題が回避され、Kafkaデータの独立性と耐久性が向上します。

Oracle SQL Access to KafkaはOracle Databaseで使用でき、PL/SQLおよびSQL問合せとともに使用されるため、Oracle Databaseへのコネクタを提供するために外部クライアント・アプリケーションは必要ありません。

ORA_KAFKA PL/SQLパッケージには、データベース・スキーマへのKafkaクラスタの登録、Kafkaトピックの問合せ、指定されたオフセットまたは指定されたタイムスタンプからのデータの問合せなどを行う関数およびプロシージャがあります。データを格納せずにグローバル一時表を使用するか、ターゲットOracle Databaseのユーザー表にデータを格納するかを選択できます。

使用方法

Oracle SQL Access to Kafkaアプリケーションを使用して、Oracle Databaseで作成されたグローバル一時表またはユーザー表にアクセスし、アプリケーションからデータを取得できます。そのデータは、データのストリーム、または他のデータベースからのデータのスナップショットである場合があり、直接アクセスすることも、Oracle Database表にロードしてアプリケーション内で使用することもできます。

Kafkaグローバル一時表には次の特性があります:

  • グローバル一時表は、アプリケーション・インスタンスの初期に1回ロードされ、アプリケーション・インスタンスの期間中、Kafkaレコードのスナップショットとして使用されます。アプリケーションでは、グローバル一時表とともに標準のOracle SQLを使用できます。
  • グローバル一時表からの各問合せの結果、Kafkaクラスタにトリップされ、同じ行および場合によっては追加行が再取得されます。

対応するグローバル一時表は、Oracle SQL Access to Kafkaビューからのスナップショットを受信します。アプリケーションは、トランザクション内の1つ以上のクエリーにこの一時表を使用します。つまり、グローバル一時表は一度ロードされ、使用されます。Kafkaオフセットが拡張され、アプリケーションがコミットされます。これは、グローバル一時表にロードされたKafkaレコードを使用して終了されることを示します。

一時表からの読取りは、次の理由のために有益です:

  • 繰返し可能な読取りは、複数の問合せから明示的に、または結合内で暗黙的にサポートされます
  • 問合せオプティマイザの信頼できる統計が収集されます
  • 一時表のロード時にKafkaに対して行われるトリップは1回のみです。後続の問合せでは、Kafkaクラスタへのトリップは行われません。
  • グローバル一時表は、標準のOracle表と結合できます。Oracle SQL Access to Kafka一時表をOracle Database表に結合すると、KafkaデータでOracle Database機能を使用する機能が向上します。
  • Oracle Databaseの成熟した最適化方針および処理方針を活用して、表の効率的な結合に必要なコード・パスを最小化できます。

21.2 Oracle SQL Access to Kafkaのグローバル表およびビュー

Oracle SQL Access to KafkaがKafka STREAMING、SEEKINGおよびLOADアプリケーションにアクセスする方法、およびグローバル一時表で使用される一意のORA$接頭辞について学習します。

KafkaトピックのSTREAMINGおよびSEEKINGにOracle SQL Access to Kafka (OSAK)を使用するアプリケーションは、PL/SQLを使用してOSAKプロシージャをコールし、対応するOracle SQL Access to Kafkaビューからの問合せの結果を使用してグローバル一時表をロードします。LOADアプリケーションは、EXECUTE_LOAD_APPプロシージャを使用して既存のOracle Database表に増分ロードを実行するため、LOADアプリケーションではグローバル一時表は必要ありません。STREAMING、SEEKINGおよびLOADアプリケーションの場合、OSAKは3つのすべての場合でビューおよび外部表を作成します。

Oracle SQL Access to Kafkaビューと一時表の両方に、Oracle SQL Access to Kafkaによって作成されたオブジェクトとしてこれらを識別する一意のORA$接頭辞があります。

ORA$DKV (ビューの場合)およびORA$DKX (表の場合)は、Oracle SQL Access to Kafkaによって生成されたビューおよび外部表の接頭辞で、Kafkaからユーザー所有の表またはグローバル一時表にデータをロードするためにDBMS_KAFKAをコールします。通常、これらのビューおよび外部表は内部オブジェクトとして扱われ、Oracleアプリケーションによって直接操作されることはありません。

ORA$DKVGTTは、ストリーミングまたはシーク・アプリケーションからロードされるグローバル一時表であることを示す接頭辞です。このグローバル一時表は、DBMS_KAFKA.LOAD_TEMP_TABLEのコール時に透過的にロードされます。

21.3 Oracle SQL Access to Kafka問合せの実行方法の理解

Oracle SQL Access to KafkaはKafkaストリーミング・データにアクセスしますが、問合せはOracle Databaseグローバル一時表で実行されるため、いくつかの利点があります。

一般的なアプリケーションは、Oracle SQL Access to Kafkaビューを直接問い合せません。かわりに、次のようにします:

  • Oracle SQL Access to Kafkaビューからの各問合せでは、現在のオフセットから現在の高水位標までKafkaから直接データをフェッチします。行は継続的に追加されるため、ビューからの各問合せにより、より多くの行が取得される可能性が高くなります。したがって、Oracle SQL Access to Kafkaビューでは、複数の問合せから明示的に、または結合内で暗黙的に、繰り返し可能な読取りはサポートされていません。
  • 問合せオプティマイザのためにOracle SQL Access to Kafkaビューから収集された信頼できる統計はありません
  • Oracle SQL Access to Kafka表からの各問合せの結果、Kafkaクラスタにトリップされ、同じ行および場合によっては追加行が再取得されます。これらの問合せ取得はパフォーマンスに影響する可能性があります。

対応する一時表は、Oracle SQL Access to Kafkaビューからのスナップショットを受信します。アプリケーションは、トランザクション内の1つ以上の問合せにこの一時表を使用します。一時表からの読取りは、次の理由のために有益です:

  • 繰返し可能な読取りは、複数の問合せから明示的に、または結合内で暗黙的にサポートされます
  • 問合せオプティマイザの信頼できる統計が収集されます
  • 一時表のロード時にKafkaに対して行われる読取りは1回のみです。後続の問合せでは、データにアクセスするためにKafkaクラスタに戻る必要はありません。

グローバル一時表は、標準のOracle表(Oracle Customer Relationship Management (CRM)表など)と結合できます。

Oracle SQL Access to Kafka一時表をOracle Database表に結合すると、次の利点が得られます:

  • Oracle Databaseの成熟した最適化方針および実行方針を活用して、表の効率的な結合に必要なコード・パスを最小化する
  • Oracle Database ACIDトランザクション処理(原子性、一貫性、独立性および耐久性)のセキュリティを備えたOracle Databaseトランザクション・セマンティクスを取得し、データに対するすべての変更がアプリケーションによって制御された単一の操作であるかのように実行されるようにする
  • Kafkaパーティション・オフセットを管理し、Oracle Databaseのデータベース・メタデータ表にコミットすることで、システム障害が発生した後、Kafkaレコードを含むこれらのOracle Databaseトランザクションが失われたりアプリケーションによって再処理されないようにする。

21.4 Oracle DatabaseへのKafkaデータのストリーミング

Oracle SQL Access to Kafkaを使用すると、標準のSQLセマンティクスを使用してKafkaストリーミング・データをOracle Database表で処理できます。

Apache Kafkaは、多くのストリーミング・ソースからデータを取得および統合し、このデータに対して分析を実行できるようにするために一般的に使用されます。通常、これを行うには、すべてのKafkaレコードをデータベースにロードし、このデータを分析のためにデータベース表と組み合せる必要があります(短期調査または長期分析用)。

Oracle SQL Access to Kafkaでは、標準のSQL、PL/SQLおよびその他のデータベース開発ツールを使用して、KafkaからOracle Databaseへのロードを実現し、JDBCアプリケーションなどの標準のOracleアプリケーション・ロジックを使用してこのデータを処理できます。Oracle SQL Access to Kafkaでは、ロードするKafkaトピックのすべてのパーティションにマップするビューを作成できます。より多くのデータをロードするためのOracle SQL Access to Kafkaの各コールにより、このビューが問い合せられ、その結果、最後に読み取られた前の時点から現在のデータの高水位標オフセット(すべてのKafkaパーティションに完全に挿入された最後のメッセージのオフセット)までのKafkaトピックのすべてのパーティションが問い合せられます。Kafkaパーティションから取得されたデータは、一時的なOracle Database表にロードされます。

これらのOracle SQL Access to Kafkaビューは、Kafkaアプリケーション・インスタンスのように動作します。高水位標オフセットに達するまで、特定のオフセットから始まるKafkaからレコードを読み取ります

Oracle SQL Access to Kafkaでビューが作成されると、対応するグローバル一時表も作成されます。アプリケーションにより、Oracle SQL Access to Kafka PL/SQLプロシージャがコールされ、このグローバル一時表が、対応するOracle SQL Access to Kafkaビューからの問合せの結果とともにロードされます。

グローバル一時表は、標準のOracle表(Oracle Customer Relationship Management (CRM)表など)と結合できます。

Oracle SQL Access to Kafka一時表をOracle Database表に結合すると、次の利点が得られます:

  • Oracle Databaseの成熟した最適化方針および実行方針を活用して、表の効率的な結合に必要なコード・パスを最小化する
  • Oracle Database ACIDトランザクション処理(原子性、一貫性、独立性および耐久性)のセキュリティを備えたOracle Databaseトランザクション・セマンティクスを取得し、データに対するすべての変更がアプリケーションによって制御された単一の操作であるかのように実行されるようにする
  • Kafkaパーティション・オフセットを管理し、Oracle Databaseのデータベース・メタデータ表にコミットすることで、システム障害が発生した後、Kafkaレコードを含むこれらのOracle Databaseトランザクションが失われたりアプリケーションによって再処理されないようにする。

21.5 タイムスタンプによるKafkaデータ・レコードの問合せ

Oracle SQL Access to Kafkaのシーク可能モードは、Kafkaデータに関連付けられたタイムスタンプに基づいて、Kafkaに格納されている古いデータを問い合せる上で役に立ちます。

異常が発生した場合は、Oracle SQL Access to Kafkaを使用して、指定した時間範囲内の異常に関連するKafkaデータの識別を支援できます。

たとえば、コンピュータ会社に複数のサイトがあるとします。各サイトにはラボがあり、建物およびラボへのアクセスはすべてキー・カード・アクセスによって保護されています。同社には膨大な数の従業員がおり、オフィス・スペースを必要とする人もいれば、ラボでマシンを維持している人もいれば、換気の問題、許可されていないアクセス、サイトの一般的な使用などの問題について建物を監視している人もいます。このシナリオでは、Kafkaトピックは次のもので構成されている場合があります:

  • キー・カードの使用(KCdata)
  • 施設の監視(Fdata)
  • 稼働時間、アクセス、侵入検出などのシステム監視(Sdata)

Kafkaデータを読み取ってOracleデータと組み合せる間に通常のイベントが検出された場合、アプリケーションは、異常なイベントを含むレコードのタイムスタンプとともに異常をログに記録できます。次に、2つ目のアプリケーションが、これらのエラーを読み取って処理できます。アプリケーションは、異常なイベントごとに、イベントの前後10秒のタイムスタンプの範囲をシークする場合があります。これは、ログ・ファイル内の例外の分析に似ています。イベントの前後にログ・エントリを参照して、例外が以前の問題によって引き起こされたかどうか、または例外によってダウンストリームの問題が発生したかどうかを確認することが一般的です。

サイトの問題を評価するには、キー・カード・リーダー・データ(KCdata)を永続表にロードできます。たとえば、複数のアプリケーションでこのデータを使用する場合は、複数のアプリケーションで使用できるOracle Database表にその日付をロードして、不動産チームが建物とオフィスの使用状況を追跡できるようにすることが道理にかなっています。IT部門は、このデータを使用して、問題を処理するためにサイトにいるユーザーを判別します。

ストリーミング問合せを使用すると、施設データ(Fdata)をスキャンして、データに非定型または異常なイベントがあるかどうかを判断できます。これには、研究室温の急上昇、閉まらなかったためにアラームを発生させているドア、アラームを鳴らしている火災検知システム、または時間枠に関連付けられたその他のデータ・ポイント(たとえば、半開きにしたままのドア)などがあります。

セキュリティ・チームには、閉じられなかったドアに関するアラートが表示されます。このチームは、ストリーミング・データを使用して、午前3時17分にドアが半開きのままにされたことを確認します。その後、Seekingクエリーを使用して、30分間のウィンドウ(3:02から3:32)で他の複数のデータ・ポイント(KCdata、Fdata、Sdata)を探し、建物にアクセスしたユーザー、どのドアまたはラボにアクセスしたか、オフラインになったマシンまたは直接アクセスされたマシン、およびその他のデータ・レコードを決定できるため、開発者の状況に適切な対応ができます。

このシナリオでは、Oracle SQL Access to Kafkaを使用して、Kafkaトピックのすべてのパーティションにマップする単一のビューを作成できます。Oracle SQL Access to Kafkaでビューが作成されると、対応するグローバル一時表も作成されます。アプリケーションは、最初に開始および終了のタイムスタンプを指定してから、Oracle SQL Access to Kafkaをコールして、指定した時間範囲内の行を含むグローバル一時表をロードします。標準のOracle Database SQLトランザクション処理を活用して、大量のデータを解析し、異常なイベント内の関連するデバイス・データを識別できます。

21.6 Kafkaデータベース管理者ロールについて

Oracle SQL Access to Kafkaを管理するには、Oracle DatabaseロールOSAK_ADMIN_ROLEを付与し、管理者ロールおよびKafka管理APIパッケージに必要な管理権限を付与します。

Oracleは、Oracle SQL access for Kafka管理権限を管理ユーザーに付与するロールベースの認証を提供するために、Oracle Database 23c以降でOSAK_ADMIN_ROLEを提供します。このロールは、Oracle SQL Access to Kafkaの管理者ユーザーに付与できます。このロールは、Kafkaクラスタを構成、登録および管理するためにOracle SQL access for Kafka管理者として指定するユーザーに必要なシステム権限を付与します。このロールによって付与されるシステム権限は次のとおりです:

  • CREATE CREDENTIAL: Kafka SASL-SSL (Simple Authentication and Security Layer)パスワードまたはOSS (Oracle Streaming Service) authTokenを作成します
  • CREATE ANY DIRECTORY: クラスタ・アクセスおよびクラスタ構成ディレクトリを作成します
  • DROP ANY DIRECTORY: クラスタ・アクセスおよびクラスタ構成ディレクトリを削除します
  • sys.dbms_kafka_clustersへのREAD権限
  • sys.dbms_kafka_applicationsへのREAD権限
  • sys.dbms_kafka_messagesへのREAD権限

21.7 ユーザーへのKafkaデータベース・アクセスの有効化

アプリケーション・ユーザー・アカウントには、OSAKへのアクセスに必要なDBMS_KAFKAデータベース権限が付与されます。

DBAとして、Oracle SQL access for Kafkaを管理および使用する権限を作成してユーザーに付与します。ユーザーは、次の2つのカテゴリがあります:

  • Oracle SQL Access to Kafka管理者は特権ユーザーです。Oracle SQL Access to Kafkaの管理を簡略化するために、Oracle DBAが指定されたKafka管理者にOSAK_ADMIN_ROLEを付与することをお薦めします。このロールは、Oracle Database 23c以降のデータベースに事前作成されます。

    管理者は、DBMS_KAFKA_ADMパッケージ・メソッドを実行して、Kafkaクラスタ情報を構成および管理します。OSAK_ADMIN_ROLEまたはOracle DBAのいずれかを付与されたユーザーは、オペレーティング・システム・レベルのクラスタ構成ディレクトリを作成し、そのディレクトリに構成ファイルを移入できます。Oracle SQL Access to Kafka管理者は、Kafkaクラスタ構成およびアクセス・ディレクトリのOracleディレクトリ・オブジェクトを作成します。

  • Kafkaトピック・データのアプリケーション・ユーザーには、Kafkaクラスタ・トピックからアクセスするデータにアクセスして使用できるように、DBMS_KAFKAパッケージへのアクセスに必要なREAD権限が付与されます。

例21-1 Kafka管理者ユーザーへのOSAK_ADMIN_ROLEの付与

この例では、OSAK_ADMIN_ROLEがユーザーkafka-adminに付与されます:

GRANT OSAK_ADMIN_ROLE
   TO kafka-admin; 

例21-2 Kafkaユーザーへのユーザー・アクセス権の付与

アプリケーションがOracle SQL Access to Kafkaを使用できるようにするには、DBMS_KAFKAアクセス権を付与します。これらのアプリケーション・ユーザーは、ソースKafkaクラスタおよびターゲットOracle Databaseに対する次の権限をすでに持っている必要があります:

  • CREATE SESSION
  • CREATE TABLE
  • CREATE VIEW
  • Kafkaデータにアクセスする表領域で使用可能な割当て制限
  • 登録済のKafkaクラスタのクラスタ・アクセス・ディレクトリに対する読取りアクセス

21.8 Oracle SQL Access to Kafkaでサポートされるデータ形式

Oracle SQL Access to Kafkaでは、区切りテキスト・データ(csvなど)、JSON、Avroの3つの形式で表されたKafkaレコードがサポートされます

Kafkaはスキーマを持たず、形式に依存しません。アプリケーション・データは、Kafkaレコードのキー・フィールドおよび値フィールドに不透明なバイト配列として格納されます。Kafkaキーは主にKafkaパーティションへのデータのハッシュに使用されるため、Kafkaレコードの値フィールドのみが取得され、Oracle行としてレンダリングされます。アプリケーションは、データのシリアライズとデシリアライズ、およびデータ形式の構造を定義するスキーマの提供を担当します。Oracle SQL Access for Kafkaでは、DBMS_KAFKA.CREATE_[LOAD|STREAMING|SEEKABLE]_APP()プロシージャのoptions引数にデータ形式およびスキーマが指定されます。

ノート:

形式タイプに関係なく、作成される表およびビューには、 KAFKA_PARTITIONKAFKA_OFFSETおよびKAFKA_EPOCH_TIMESTAMPの3つの追加列が含まれます。

21.8.1 JSON形式およびOracle SQL Access to Kafka

JSONの場合、Oracle SQL Access to Kafkaにより、表またはビューの列が決定されます。

次に、オプションを使用してJSONストリーミング・アプリケーションのデータを表示する例を示します:

DECLARE
 v_options VARCHAR2;
BEGIN
 v_options := ‘{"fmt" : "JSON"}';
 SYS.DBMS_KAFKA.CREATE_STREAMING_APP (
                'ALPHA1',
                'MYAPP',
                'ExampleTopic',
                v_options);
END;
/

Javascript Object Notation (JSON)データでは、Oracle SQL Access to Kafkaにより、Kafkaデータを介してユーザー・スキーマにビューおよびグローバル一時表が作成されます。これらのビューには、接頭辞ORA$DKV_が付きます。一時表には、接頭辞ORA$DKVGTT_が付きます。パッケージDBMS_KAFKA.CREATE_xxx_APPは、固定スキーマを使用して、KafkaレコードからJSONデータを返します。

たとえば:

SQL> describe ORA$DKVGTT_ALPHA1_MYAPP_0; 
 Name                                      Null?    Type 
 ----------------------------------------- -------- ---------------------------- 
 KAFKA_PARTITION                                    NUMBER(38) 
 KAFKA_OFFSET                                       NUMBER(38) 
 KAFKA_EPOCH_TIMESTAMP                              NUMBER(38) 
 VALUE                                              VARCHAR2(4000) 

VARCHAR2型の場合、VALUE列の長さは、データベースの最大varchar2長によって制限されます。VALUE列には、タイプCLOBのオプションがあります。

KAFKA_列は、KafkaレコードのパーティションID、オフセットおよびタイムスタンプを識別します。(基礎となるタイムスタンプ表現は、Unixエポック以降のミリ秒数を表す整数です。)

Kafkaレコードの値部分のデータは、VALUE列にテキストとして返されます。外部テキストの文字エンコーディングは、AL32UTF8として固定されています。Oracle SQL Access to Kafkaロジックでは、VALUE列の有効なJSON構文はチェックされません。ただし、SQL問合せのJSON演算子がVALUEデータを解析しようとすると、障害のあるJSONが検出されます。

21.8.2 区切りテキスト形式およびOracle SQL Access to Kafka

区切りテキスト形式の場合、Oracle SQL Access to Kafkaにより、Kafkaデータを持つユーザー・スキーマにビューおよび一時表が作成されます。

CSVやカンマ区切りデータなどの区切りデータの場合、Oracle SQL Access to Kafkaにより、Kafkaデータを介してユーザー・スキーマにビューおよびグローバル一時表が作成されます。これらのビューには、接頭辞ORA$DKV_が付きます。一時表には、接頭辞ORA$DKVGTT_が付きます。DSV形式の場合、データ列は、オプションで渡された参照表と3つのメタデータ列に基づきます

Oracle SQL Access to Kafkaのデリミタ付きテキスト形式を使用して作成された一時表およびビューには、Kafkaレコードの値フィールド内のデリミタ付きテキスト・データの形状を反映する列があります。Oracle SQL Access to Kafkaでは、テキスト・データは表およびビュー定義で表現されるネイティブOracleデータ型に変換されます。外部テキストの文字エンコーディングは、AL32UTF8として固定されています。

Kafkaレコードが取得されると、正規レイアウトが作成されます。これは、Kafkaパーティション識別子(INTEGER)、Kafkaレコード・オフセット(INTEGER)およびKafkaレコード・タイムスタンプ(INTEGER)で始まり、Kafka値内のデリミタ付きテキスト・データが続きます。つまり、Kafkaデータはフラット化され、ビュー・スキーマ定義の順序を使用して純粋なデリミタ付きテキスト・フィールドの行としてストリーミングされます。

次のOracleデータ型がサポートされています:

  • INTEGERINTNUMBER
  • CHARVARCHAR2
  • NCHARNVARCHAR2
  • CLOBNCLOBBLOB
  • FLOATBINARY_FLOATBINARY_DOUBLE
  • TIMESTAMPDATE
  • TIMESTAMP WITH TIME ZONETIMESTAMP WITH LOCAL TIME ZONE
  • INTERVAL
  • RAW
  • BOOLEAN

アプリケーション作成時のデリミタ付きテキストの仕様を簡略化するには、Kafkaレコード値フィールドでユーザー・データの列が物理的に順序付けされた順序でこれらの列を記述する表の名前を指定します。Oracle SQL Access to Kafkaでは、その名前がビューおよび一時表で使用されます。

次の例は、Oracle SQL Access to Kafkaアプリケーションの作成時に指定されたデリミタ付きテキスト・データ表(参照表またはreftable)の形状を示しています。ここでも、Kafka値フィールドには、物理的な順序およびデリミタ付きテキストからの必要なデータ型変換が反映されています。

reftableは、形状を反映するOracle SQL Access to Kafkaのビューおよび一時表を作成するためにCREATE_xxx_APPコールに使用された後、保持する必要があります。ビューを再作成するには、reftableが必要です。

SQL> describe FIVDTI_SHAPE; 
 Name                                      Null?    Type 
 ----------------------------------------- -------- ---------------------------- 
 F1                                                 NUMBER 
 I2                                                 NUMBER 
 V3                                                 VARCHAR2(50) 
 D4                                                 DATE 
 T5                                                 TIMESTAMP(6) 
 V6                                                 VARCHAR2(200) 
 I7                                                 NUMBER 

参照表には、Kafkaレコード値のフィールドのみが記述されています。たとえば、reftable FIVDTI_SHAPEは、F1、I2、V3、D4、T5、V6、I7がKafkaレコード値のフィールドであるKafkaレコードをサポートできる場合があります。Kafkaレコード値のフィールドは、デリミタ(カンマ・デリミタなど)で区切る必要があります。

ノート:

参照表に非表示列を含めることはできません。列の順序は、Kafkaレコードのデータ値の順序と一致する必要があります。非表示列のCOLUMN_IDNULLであるため、列リスト内での位置を特定できません。

FIVDTI_SHAPE表に記述されたデータ用として作成されたOracle SQL Access to Kafka一時表には、次のスキーマが含まれます:

SQL> describe ORA$DKVGTT_ALPHA1_MYAPP__0; 
 Name                                      Null?    Type 
 ----------------------------------------- -------- ---------------------------- 
 KAFKA_PARTITION                                    NUMBER(38) 
 KAFKA_OFFSET                                       NUMBER(38) 
 KAFKA_EPOCH_TIMESTAMP                              NUMBER(38) 
 F1                                                 NUMBER 
 I2                                                 NUMBER 
 V3                                                 VARCHAR2(50) 
 D4                                                 DATE 
 T5                                                 TIMESTAMP(6) 
 V6                                                 VARCHAR2(200) 
 I7                                                 NUMBER 

21.8.3 Avro形式およびOracle SQL Access to Kafka

Avro形式の場合、Avroスキーマを使用してデータ列と3つのメタデータ列が決定されます。

21.8.3.1 Oracle SQL Access to KafkaでのAvro形式の使用について

Oracle SQL Access to KafkaがAvro形式のKafkaデータがOracle Databaseの表およびビューで使用できるようにする方法について説明します。

Oracle Databaseの表およびビューでアプリケーション別のApache Avro形式のデータを使用できるように、Oracle SQL Access for Kafkaでは、options引数で指定されたスキーマに基づくデータ形式をDBMS_KAFKA.CREATE_[LOAD|STREAMING|SEEKABLE]_APP()プロシージャに変換します。

Apache Avroレコードは、名前付きフィールドおよび型の順序付きリストです。レコードのスキーマは、データの構造およびその読取り方法を定義します。Avroスキーマは、Oracle SQL Access to Kafkaアプリケーションの作成時に渡す必要があります。つまり、Oracle SQL Access to Kafkaアプリケーションでは、Kafkaトピックに対して1つのAvroスキーマのみをサポートできます。トピック・ストリームで複数のスキーマ型を使用することはサポートされていません。スキーマが進化した場合、新しいOracle SQL Access to Kafkaアプリケーションを作成する必要があります。Oracle SQL Access to Kafkaは、Confluent Schema Registryをサポートしていません。Avro形式のKafkaレコードにConfluentヘッダーが含まれている場合、そのヘッダーは削除され、Oracle SQL Access to Kafkaによって無視されます。

Kafkaはスキーマを持たず、形式に依存しません。アプリケーション・データは、Apache Avroレコードのキー・フィールドおよび値フィールドに不透明なバイト配列として格納されます。Kafkaキーは主にKafkaパーティションへのデータのハッシュに使用されるため、Apache Avroレコードの値フィールドのみが取得され、Oracle Database表でOracle行としてレンダリングされます。アプリケーションは、データのシリアライズとデシリアライズ、およびデータ形式の構造を定義するスキーマの提供を担当します。

Oracle SQL Access to KafkaではAvroプリミティブ型と複合型の両方を使用できますが、アプリケーションごとに1つのタイプのみを使用できます。

21.8.3.2 Oracle SQL Access to KafkaでサポートされるAvroのプリミティブ型

Apache Avroスキーマのプリミティブ型名をデータベースで使用するために、Oracleでは、これらの型をSQLデータ型に変換します。

表21-1 Oracle SQL Access to KafkaのAvroプリミティブ型およびOracle型の変換

型の説明 Avroプリミティブ型 Oracle型

null/値なし

null

VARCHAR2(1)

(該当なし)

boolean

NUMBER(1)

32ビット符号付き整数

int

INTEGER

64ビット符号付き整数

long

INTEGER

IEEE 32ビット浮動小数点

float

BINARY_FLOAT

IEEE 64ビット浮動小数点

double

BINARY_DOUBLE

バイト配列/バイナリ

bytes

BLOB

UTF-8でエンコードされた文字列

string

VARCHAR2

次のAvroスキーマの例では、すべてのAvroプリミティブ型を使用するレコードを定義します:

{
 "type" : "record",  
 "name" : "primitives",
 "fields" : [
 { "name" : "f_null", "type" : "null" }, 
 { "name" : "f_boolean", "type" : "boolean" }, 
 { "name" : "f_int", "type" : "int"}
 { "name" : "f_long", "type" : "long" },
 { "name" : "f_float", "type" : "float" },
 { "name" : "f_double", "type" : "double" },
 { "name" : "f_bytes", "type" : "bytes" },
 { "name" : "f_string", "type" : "string"}
 ]
}

この例のAvroスキーマを使用してAvroデータ用のOracle SQL Access to Kafka一時表を作成した場合、一時表には次のスキーマが含まれます:

describe ORA$DKVGTT_ALPHA1_MYAPP__0; 
 Name                                      Null?    Type 
 ----------------------------------------- -------- ---------------------------- 
 KAFKA_PARTITION                                    NUMBER(38) 
 KAFKA_OFFSET                                       NUMBER(38) 
 KAFKA_EPOCH_TIMESTAMP                              NUMBER(38) 
 F_NULL                                             CHAR(1)
 F_BOOLEAN                                          NUMBER(1) 
 F_INT                                              NUMBER(38)
 F_LONG                                             NUMBER(38) 
 F_FLOAT                                            BINARY_FLOAT
 F_DOUBLE                                           BINARY_DOUBLE 
 F_BYTES                                            BLOB 
 F_STRING                                           VARCHAR2(4000)

VARCHAR2型の長さ(この例では、F_STRING列用)は、データベースの最大varchar2長によって決まります。

21.8.3.3 Oracle SQL Access to KafkaでサポートされるAvroの複雑な型

Apache Avroスキーマの複合型名をデータベースで使用するために、Oracleでは、これらの型をサポートされているSQLデータ型に変換します。

説明

Apache Avroの複合データ型は、指定された属性を取得します。Avro複合型を使用するために、次の表に示すように、これらはOracle型に変換されます。

表21-2 Oracle SQL Access to KafkaのAvro複合型およびOracle型の変換

Avro複合型 Oracle型 型の説明

fixed

BLOB

fixed型は、バイナリ・データの格納に使用できる固定長フィールドの宣言に使用されます。フィールドの名前と1バイトのサイズの2つの必須属性があります。

enum

VARCHAR2

Avro enumフィールド。

Avro enumは列挙型です。これは、型名がenumのJSON文字列で構成され、enumの名前を取得し、追加のオプション属性を使用できます。

record

VARCHAR2

構造フィールド。

構造フィールドは、入力Avroレコードのフィールドに対応します。レコードは、すべてが結合されて1つのことを記述する属性のカプセル化を表します。

map

VARCHAR2

mapは、データをキーと値のペアとして編成する連想配列(またはディクショナリ)です。Avro mapのキーは文字列である必要があります。Avro mapでは、1つの属性(値)のみがサポートされます。この属性は必須であり、mapの値部分のタイプを定義します。

値には任意の型を使用できます。

array

VARCHAR2

任意の型のarray

arrayタイプはarrayフィールドを定義します。これは必須のitems属性のみをサポートします。items属性は、array内のitemsの型を識別します。

ノート:

Avro複合型recordmapおよびarrayは、VARCHAR2型への変換の前にJSON形式の文字列に変換されます。

次のAvroスキーマの例では、すべてのAvro複合型を使用するレコードを定義します:

{
 "type" : "record",
 "name" : "complex",
 "fields" : [
  { "name" : "f_fixed", 
    "type" : { "type" : "fixed", "name" : "ten", "size" : 10} 
  },
  { "name" : "f_enum",
    "type" : { "type" : "enum", "name" : "colors",
               "symbols" : [ "red", "green", "blue" ]   }  
  },
  { "name" : "f_record", 
    "type" : {"type" : "record", "name" : "person",
              "fields" : [ { "name" : "first_name", "type" : "string" },
                           { "name" : "last_name",  "type" : "string"} ] }
  }, 
  { "name" : "f_map",
    "type" : { "type" : "map", "values" : "int" }
  },
  { "name" : "f_array",
    "type" : {"type" : "array", "items" : "string"  }
  }]
}

この例のAvroスキーマを使用してAvroデータ用のOracle SQL Access to Kafka一時表を作成した場合、一時表には次のスキーマが含まれます:

describe ORA$DKVGTT_ALPHA1_MYAPP__0; 
 Name                                      Null?    Type 
 ----------------------------------------- -------- ---------------------------- 
 KAFKA_PARTITION                                    NUMBER(38) 
 KAFKA_OFFSET                                       NUMBER(38) 
 KAFKA_EPOCH_TIMESTAMP                              NUMBER(38) 
 F_FIXED                                            BLOB
 F_ENUM                                             VARCHAR2(4000)
 F_RECORD                                           VARCHAR2(4000)
 F_MAP                                              VARCHAR2(4000) 
 F_ARRAY                                            VARCHAR2(4000)

VARCHAR2型の長さ(この例では、F_ENUMF_RECORDF_MAPおよびF_ARRAY列)は、データベースの最大varchar2長によって決定されます。

21.8.3.4 Oracle SQL Access to KafkaでサポートされるAvroの論理型

Apache Avroスキーマの論理型名をデータベースで使用するために、Oracleでは、これらの型をサポートされているSQLデータ型に変換します。

説明

Avro論理型は、Avroプリミティブ型または複合型で、導出型を表す追加の属性があります。論理型は、次の表に示すようにOracle型に変換されます。

表21-3 Oracle SQL Access to KafkaのAvro複合型およびOracle型の変換

型の説明 Avro論理型 Oracle型

10進数: スケールなし ×10-scale形式の任意の精度の符号付き10進数

decimal (bytes, fixed)

NUMBER

UUID (汎用一意識別子)、GUIDS (グローバル一意識別子)とも呼ばれます:

これらのIDは、RFC-4122に準拠してランダムに生成されます。

UUID (string)

サポートされていません。

日付

特定のタイム・ゾーンや時間を参照しないカレンダ内の日付

Unixエポック(1970年1月1日)からの日数

date (int)

DATE

時間(ミリ秒):

特定のカレンダ、タイム・ゾーンまたは日付を参照せずに、午前0時以降のミリ秒数として表される時刻: 00:00:00.000

time-millis (int)

TIMESTAMP

時間(マイクロ):

特定のカレンダ、タイム・ゾーンまたは日付を参照せずに、午前0時以降のマイクロ秒数として表される時刻: 00:00:00.000000

time-micros (long)

TIMESTAMP

タイムスタンプ(ミリ) UTC:

特定のタイム・ゾーンまたはカレンダに関係なく、Unixエポック(1970年1月1日)からのミリ秒数として表されるグローバル・タイムライン上の時点: 00:00:00.000 UTC

timestampmillis (long)

TIMESTAMP

タイムスタンプ(マイクロ) UTC:

特定のタイム・ゾーンまたはカレンダに関係なく、Unixエポック(1970年1月1日)からのマイクロ秒数として表されるグローバル・タイムライン上の時点: 00:00:00.000000 UTC

timestampmicros (long)

TIMESTAMP

時間

月数、日数およびミリ秒数で定義された時間。

fixed (size:12)

サポートされていません。

ノート:

論理型time-millis、time-macros、timestampmillisおよびtimestampmicrosで使用される10進型は、バイト配列として内部的に格納されます(固定または非固定)。Avroライターによっては、これらの配列の中には小数点の文字列表現を格納するものもあれば、スケーリングされていない値を格納するものもあります。あいまいなデータを表示しないように、オプションavrodecimaltypeを使用して、使用する表現を明示的に宣言することをお薦めします。このオプションが明示的に指定されていない場合、Oracle SQL Access to Kafkaのデフォルト・オプションでは、スケーリングされていないデータの表現がファイルの10進列に格納されます。

次のAvroスキーマの例では、すべてのAvro論理型を使用するレコードを定義します:

{
  "type" : "record",
  "name" : "logical",
  "fields" : [ {
  "name" : "f_decimal",
  "type" : {
      "type" : "bytes",
      "logicalType" : "decimal",
      "precision" : 4,
      "scale" : 2
    }
  }, {
  "name" : "f_date",
  "type" : {
      "type" : "int",
      "logicalType" : "date"
    }
  }, {
  "name" : "f_time_millis",
  "type" : {
      "type" : "int",
      "logicalType" : "time-millis"
    }
  }, {
  "name" : "f_time_micros",
  "type" : {
      "type" : "long",
      "logicalType" : "time-micros"
    }
  }, {
  "name" : "f_timestamp_millis",
  "type" : {
      "type" : "long",
      "logicalType" : "timestamp-millis"
    }
  }, {
  "name" : "f_timestamp_micros",
  "type" : {
      "type" : "long",
      "logicalType" : "timestamp-micros"
    }
  } ]
}

この例のAvroスキーマを使用してAvroデータ用のOracle SQL Access to Kafka一時表を作成した場合、一時表には次のスキーマが含まれます:

describe ORA$DKVGTT_ALPHA1_MYAPP__0; 
 Name                                      Null?    Type 
 ----------------------------------------- -------- ---------------------------- 
 KAFKA_PARTITION                                    NUMBER(38) 
 KAFKA_OFFSET                                       NUMBER(38) 
 KAFKA_EPOCH_TIMESTAMP                              NUMBER(38) 
 F_DECIMAL                                          NUMBER
 F_DATE                                             DATE
 F_TIME_MILLIS                                      TIMESTAMP(3)
 F_TIME_MICROS                                      TIMESTAMP(6)
 F_TIMESTAMP_MILLIS                                 TIMESTAMP(3)
 F_TIMESTAMP_MICROS                                 TIMESTAMP(6)

21.9 Kafkaクラスタへのアクセスの構成

保護されているKafkaクラスタまたは保護されていないKafkaクラスタへのアクセスを構成できます

21.9.1 クラスタ・アクセス・ディレクトリの作成

Oracle SQL Access to Kafka管理者は、クラスタへのデータベース・ユーザー・アクセスを制御するために、Kafkaクラスタごとにクラスタ・アクセス・ディレクトリ・オブジェクトを作成する必要があります。

クラスタ・アクセス・ディレクトリは、Kafkaクラスタ構成ファイルを含むOracleディレクトリ・オブジェクトです。このディレクトリはすべてのクラスタに必要です。Kafkaクラスタにアクセスするには、Kafkaクラスタごとに独自のクラスタ・アクセス・ディレクトリが必要です。Oracle SQL Access to Kafka管理者として、クラスタ・アクセス・ディレクトリ・オブジェクトを作成し、Kafkaクラスタにアクセスする必要があるデータベース・ユーザーにこのディレクトリへのREADアクセス権を付与することで、Kafkaクラスタへのアクセスを管理します。DBMS_KAFKA_ADM.REGISTER_CLUSTER()プロシージャをコールする前に、クラスタ・アクセス・ディレクトリを作成する必要があります。

例21-3 クラスタ・アクセス・ディレクトリ・オブジェクトの作成とREADアクセス権の付与

まず、クラスタ・アクセス・ディレクトリ・オブジェクトを作成します。この例では、オブジェクトはosak_kafkaclus1_accessです:

 CREATE DIRECTORY osak_kafkaclus1_access AS '';;

Kafkaクラスタが正常に登録されたら、Oracle SQL Access to Kafka管理者は、このディレクトリに対するREADアクセス権をユーザーに付与します。

この例では、ユーザーexample_userosak_kafkaclus1_accessへのアクセス権が付与されています:

 GRANT READ ON DIRECTORY osak_kafkaclus1_access TO example_user;

21.9.2 Kafka構成ファイル(osakafka.properties)

Kafkaクラスタにアクセスするには、Kafkaクラスタへのアクセスに必要な情報を含む構成ファイルを作成する必要があります。

21.9.2.1 Kafka構成ファイルについて

osakafka.propertiesファイルには、保護されているKafkaクラスタへのアクセスに必要な構成情報と、Oracle SQL Access to Kafkaに関する追加情報が含まれています。

Kafka構成ファイルosakafka.propertiesは、クラスタ・アクセス・ディレクトリに作成されます。osakafka.propertiesファイルは、(OSAK_ADMIN_ROLEが付与された) Oracle SQL Access to Kafka管理者が作成します。このファイルは、Apache Kafkaクラスタに接続するためにDBMS_KAFKA_ADMパッケージによって使用されます。

Oracle SQL Access to Kafka管理者は、各Kafkaクラスタの構成ファイルを格納するクラスタ・アクセス・ディレクトリを作成します。各クラスタ・アクセス・ディレクトリには、独自のKafka構成ファイルがあります。Apache Kafkaクラスタへのアクセスを管理するために、1人のOracle SQL Access to Kafka管理者のみが、Kafkaクラスタのクラスタ・アクセス・ディレクトリに対する読取りおよび書込みアクセス権を持ちます。他のユーザーには、クラスタ・アクセス・ディレクトリまたはKafka構成ファイルに対する権限は付与されません。

Kafka構成ファイルの機能

osakafka.propertiesファイルは、librdkafkaを使用するKafkaコンシューマで使用されるコンシューマ・プロパティ・ファイルに似ています。セキュアなApache Kafkaクラスタには、認証局、クライアント秘密キーおよびクライアント公開証明書(PEM)などの資格証明ファイルが必要です。これらの追加ファイルも、librdkafkaを使用するKafkaコンシューマに必要なものと似ています。osakafka.propertiesファイルには、次のプロパティがあります:

  • Kafkaクラスタへのアクセスに必要な設定および構成の一部として、Oracle SQL Access to Kafka管理者によって作成および管理されます。
  • キーと値のペアのテキスト・ファイルで構成されます。各行の形式は、キーおよびを記述するキー=で、改行で終了します。改行文字をキーまたは値の一部にすることはできません。
  • osak接頭辞で識別されるOracle SQL Access to Kafkaパラメータが含まれます。
  • Oracle SQL Access to Kafkaのデバッグ・プロパティが含まれます。
  • librdkafka APIを使用してKafkaクラスタに接続するためにDBMS_KAFKA_ADMパッケージによって使用されます。
  • セキュアなKafkaクラスタが、librdkakaインタフェースを使用してKafkaクラスタに接続するために必要なセキュリティ構成プロパティ、osak接頭辞で識別されるOracle SQL Access to Kafkaチューニング・プロパティ、およびデバッグ・プロパティを格納するために必要です。セキュアなクラスタ・アクセスの場合、キーと値のペアには、SSL/TLS証明書、クライアントの公開キーおよび秘密キーなどのクラスタ構成ファイルが含まれます。
  • セキュアでないKafkaクラスタでは、クラスタ接続用のチューニング・プロパティとデバッグ・プロパティを含めることはオプションです

osakafka.propertiesファイルは、Oracle SQL Access to Kafkaクラスタ・アクセス・ディレクトリのパスORACLE_base/osak/clusters/cluster-name/configに格納されます。ここで、Oracle_baseは、ターゲットOracle DatabaseのOracleベース・ディレクトリで、cluster-nameは、アクセス情報が構成ファイルに格納されるKafkaクラスタの名前です。

Kafka構成ファイルの作成に関するガイドライン

Apache Kafkaクラスタ、Oracle SQL Access to Kafka管理者へのアクセスに必要な設定および構成の一部として、このファイルの情報を使用して、Cインタフェースでセッション・コンテキストを設定します。これにより、librdkafka APIを使用してKafkaクラスタに接続されます。

SYS.DBMS_KAFKA_SEC_ALLOWED_PROPERTIESシステム表には、サポートされているコンシューマ構成プロパティ(セキュリティ・プロパティを含む)の事前移入済リストが含まれています。拡張性のために、SYSはこの表に特定の制限付きでさらにプロパティを追加できます

DBMS_KAFKA_ADM.REGISTER_CLUSTER()プロシージャは、SYS.DBMS_KAFKA_SEC_ALLOWED_PROPERTIESシステム表にもリストされているosakafka.propertiesファイルからこれらのプロパティのみを読み取ります。その他のプロパティは無視されます。

21.9.2.2 Oracle SQL Access for Kafka構成ファイルのプロパティ

osakafka.propertiesファイルを作成するには、ここで説明するようにプロパティを確認して指定します。

osakafka.propertiesファイル処理

osakafka.propertiesに指定するプロパティは、次の表に示すプロパティである必要があります。その他のキーと値のペアを指定すると、これらの値は無視されます。

次の点に注意してください。

  • osak接頭辞があるプロパティ名は、内部チューニング・プロパティまたはデバッグ・プロパティです。
  • osak接頭辞がないプロパティ名は、librdkafkaで使用されるKafkaコンシューマ・プロパティです。プロパティの完全なリストについては、Apache Kafka C/C++クライアント・ライブラリ(librdkafka)のドキュメントを参照してください。
プロパティ 許可される値 説明
security.protocol

PLAINTEXT

SSL

SASL_PLAIN_TEXT

SASL_SSL

Kafkaブローカとの通信に使用されるセキュリティ・プロトコル

sasl.mechanisms

GSSAPI

PLAIN

SCRAM-SHA-256

SCRAM-SHA-512

認証に使用するSASLメカニズム

ノート: 複数形の名前にしても、構成する必要があるメカニズムは1つのみです。

このプロパティは、古いKafkaクラスタに下位互換性を提供できます。可能な場合は、プロパティsasl.mechanismをかわりに使用することをお薦めします。

sasl.mechanism

GSSAPI

PLAIN

SCRAM-SHA-256

SCRAM-SHA-512

認証に使用するSimple Authentication and Security Layer (SASL)メカニズム

ssl.ca.location

クラスタ構成ディレクトリ内のファイル ブローカ・キーを検証するための認証局(CA)証明書のファイル名。絶対パスが指定されている場合、パスの最後のトークンがファイル名とみなされます。
ssl.key.location クラスタ構成ディレクトリ内のファイル

クライアント秘密キーのファイル名

絶対パスが指定されている場合、パスの最後のトークンがファイル名とみなされます。

対応するパスワード値は、DBMS_CREDENTIALCREATE_CREDENTIAL()プロシージャを使用してデータベース資格証明として格納する必要があります

ssl.certificate.location クラスタ構成ディレクトリ内のファイル

クライアント公開(PEM)キーのファイル名

絶対パスが指定されている場合、パスの最後のトークンがファイル名とみなされます。

ssl.endpoint.identification.algorithm

有効な値:

https

none

Kafkaブローカ証明書を使用してKafkaブローカ・ホスト名を検証するためのエンドポイント識別アルゴリズム。値は次のとおりです。

https: RFC2818で指定されたサーバー(Kafkaブローカ)ホスト名検証。

none: エンドポイント検証は行われません。

デフォルト値: なし

sasl.username ユーザー名

Kafkaクラスタへの認証に必要なユーザー名。

このユーザー名に対応するパスワード値は、DBMS_CREDENTIALCREATE_CREDENTIAL()プロシージャを使用してデータベース資格証明として格納する必要があります

sasl.kerberos.principal クライアントKafka Kerberosプリンシパル名 クライアントKerberosプリンシパル名
sasl.kerberos.ccname Kerberosチケット・キャッシュ・ファイル名

Kerberosチケット・キャッシュ・ファイル

例: krb5ccname_osak

このファイルは、クラスタ構成ディレクトリに存在する必要があります。

sasl.kerberos.config Kerberos構成ファイル名

KafkaクラスタのKerberos構成。例: krb5.conf

このファイルは、クラスタ構成ディレクトリに存在する必要があります

sasl.kerberos.service.name Kerberosプリンシパル名(Kafkaプライマリ名) Kerberosプリンシパルのプライマリ名。スラッシュ(/)の前に表示される名前です。たとえば、kafkaはKerberosプリンシパルkafka/broker1.example.com@EXAMPLEのプライマリ名です。
max.partition.fetch.bytes 1024 * 1024

librdkafkaSDKクライアントの場合、パーティションごとに1MBを割り当てることをお薦めします。

debug すべて 接続の問題をデバッグするために使用されます。

次に、セキュリティ・プロトコルSSLを指定し、クライアント上で認証局(CA)証明書を使用して認証を提供するosakafka.propertiesファイルの例を示します:

security.protocol=ssl
ssl.ca.location=ca-cert
ssl.certificate.location=client_myhostname_client.pem 
ssl.key.location=client_myhostname_client.key
21.9.2.3 Kafkaアクセス・ディレクトリの作成

セキュアなKafkaクラスタにアクセスするには、KafkaクラスタごとにKafkaアクセス・ディレクトリを作成する必要があります。

Oracle SQL Access to Kafka管理者は、オペレーティング・システム・ディレクトリOracle-base/osak/cluster_name/configを作成します。この場合、Oracle-baseはOracleベース・ディレクトリ、cluster_nameSYS.DBMS_KAFKA_ADM.REGISTER_CLUSTERコールに渡されるクラスタ名パラメータの値です。各Kafkaクラスタには、独自の専用Kafkaクラスタ・ディレクトリが必要です。

このディレクトリには、Kafkaクラスタへのアクセスに必要なすべての構成ファイルが含まれている必要があります:

  • osakafka.propertiesファイル。
  • osakafka.propertiesファイルにリストされているセキュリティ・ファイル

次の例では、Oracleベース・ディレクトリは/u01/app/oracleで、クラスタ名はkafkaclus1です:


 mkdir u01/app/oracle/osak/kafkaclus1/config;
 CREATE DIRECTORY osak_kafkaclus1_config AS 
 ‘u01/app/oracle/osak/kafkaclus1/config’ ;

21.9.3 Kafka構成ファイルのプロパティ

ここで説明するプロパティは、Kafka構成ファイルosakafka.propertiesで使用されます。

説明

Kafka構成ファイルのプロパティには、Apache Kafkaクラスタの構成情報が含まれています。Kafka構成ファイルには、2つのカテゴリのプロパティ名があります:

  • コンシューマ構成プロパティ・パラメータは、Apache Kafkaブローカで使用されるプロパティです。これらのファイル
  • Oracleプロパティは、osak接頭辞が付いたプロパティ名です。これらのプロパティは、内部チューニングまたはデバッグに使用されます。

Kafka構成ファイルにリストされているプロパティは、サポートされているすべてのプロパティを含むシステム表SYS.DBMS_KAFKA_SEC_ALLOWED_PROPERTIESとクロスチェックされます。osakafka.propertiesファイルに指定されているが、SYS.DBMS_KAFKA_SEC_ALLOWED_PROPERTIES表にリストされていないプロパティは、OSAKによって無視されます。osakafka.propertiesファイルで許可されるプロパティおよび値を次に示します:

表21-4 Kafka構成ファイルのプロパティ名および説明

プロパティ名 許可される値 説明

security.protocol

PLAINTEXTSSLSASL_PLAIN_TEXTSASL_SSL

Kafkaブローカとの通信に使用されるセキュリティ・プロトコル

sasl.mechanisms

GSSAPIPLAINSCRAM-SHA-256SCRAM-SHA-512

認証に使用するSASLメカニズム

ノート: 複数形の名前にしても、構成する必要があるメカニズムは1つのみです。

このプロパティは、古いKafkaクラスタに下位互換性を提供できます。可能な場合は、プロパティsasl.mechanismをかわりに使用することをお薦めします。

sasl.mechanism

GSSAPIPLAINSCRAM-SHA-256SCRAM-SHA-512

認証に使用するSASLメカニズム

ssl.ca.location

クラスタ構成ディレクトリ内のファイル

ブローカ・キーを検証するための認証局(CA)証明書のファイル名。絶対パスが指定されている場合、パスの最後のトークンがファイル名とみなされます

ssl.key.location

クラスタ構成ディレクトリ内のファイル

クライアント秘密キーのファイル名

絶対パスが指定されている場合、パスの最後のトークンがファイル名とみなされます

対応するパスワード値は、DBMS_CREDENTIALCREATE_CREDENTIAL()プロシージャを使用してデータベース資格証明として格納する必要があります

ssl.certificate.location

クラスタ構成ディレクトリ内のファイル

クライアント公開(PEM)キーのファイル名

絶対パスが指定されている場合、パスの最後のトークンがファイル名とみなされます。

ssl.endpoint.identification.algorithm

有効な値: httpsnone

デフォルト値: なし

Kafkaブローカ証明書を使用してKafkaブローカ・ホスト名を検証するためのエンドポイント識別アルゴリズム。

https: RFC2818で指定されたサーバー(Kafkaブローカ)ホスト名検証。

none: エンドポイント検証は行われません。

sasl.username

Kafkaクラスタでの認証に必要なユーザー名

Kafkaクラスタでの認証に必要なユーザー名。

対応するパスワード値は、DBMS_CREDENTIALCREATE_CREDENTIAL()プロシージャを使用してデータベース資格証明として格納する必要があります

sasl.kerberos.principal

クライアントKafka Kerberosプリンシパル名

クライアントKerberosプリンシパル名

sasl.kerberos.ccname

Kerberosチケット・キャッシュ・ファイル名

Kerberosチケット・キャッシュ・ファイル

例: krb5ccname_osak

このファイルは、クラスタ構成ディレクトリに存在する必要があります。

sasl.kerberos.config

Kerberos構成ファイル名

KafkaクラスタのKerberos構成。

例: krb5.conf

このファイルは、クラスタ構成ディレクトリに存在する必要があります

sasl.kerberos.service.name

Kafkaで実行するKerberosプリンシパル名。

Kafkaで実行するKerberosプリンシパル名。

max.partition.fetch.bytes

1024 * 1024

librdkafkaSDKクライアントに対してパーティションごとに1MBを割り当てることをお薦めします。

debug

すべて

接続の問題をデバッグするために使用されます

例21-4 プロパティを含む構成ファイル

osakafka.properties file for security protocol: SSL with client authentication
security.protocol=ssl
ssl.ca.location=ca-cert
ssl.certificate.location=client_myhostname_client.pem 
ssl.key.location=client_myhostname_client.key

21.9.4 クラスタ・アクセス・ディレクトリに必要なセキュリティ構成ファイル

セキュリティ・プロトコルに基づいて、必要な構成ファイルを特定します。

セキュアなKafkaクラスタへのアクセスを構成するには、Oracle SQL Access to Kafka管理者は、Kafkaクラスタ・アクセス・ディレクトリから複数の構成ファイルを追加する必要があります。必要なファイルのリストは、Kafkaクラスタでセキュリティを構成するために使用されるセキュリティ・プロトコルによって異なります。ファイル・リストには、認証局ファイル、SSLクライアント公開証明書ファイル(PEM形式)、SSLクライアント非公開キー・ファイルなどのファイルを含めることができます。

ノート:

Kerberosチケット管理はOracle SQL Access to Kafkaの外部で処理されるため、Kerberos keytabファイルは必要ありません。
21.9.4.1 SASL_SSL/GSSAPI

クラスタ・アクセス・ディレクトリに必要なファイルであるGSSAPI認証プロトコルを使用するSASL_SSLを持つApacheクラスタ

説明

SASL_SSL/GSSAPIプロトコルは、暗号化を使用したKerberos認証を指定します。Kerberosチケットは外部で管理する必要があります(Oracle SQL Access to Kafkaの外部)。

DBMS_CREDENTIAL

Kerberosチケットは外部で管理されるため、必須ではありません。

クラスタ・アクセス・ディレクトリの必須ファイル

  1. 認証局(CA)ファイル
  2. CAファイルを指定するSSL.CA.locationを含むosakafka.propertiesファイルは、SSL認証局です。

次の例では、プロパティsecurity.protocolSASL_SSLを指定します。プロパティsasl.mechanismは、GSSAPIを指定します。CAファイルはCA-cert.pemで、プロパティssl.CA.locationで指定されます。

security.protocol=SASL_SSL
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.kerberos.config=krb5.conf 
sasl.kerberos.ccname=krb5ccname_osak 
sasl.kerberos.principal=kafkaclient/<FQDN-hostname>@<Realm>
ssl.ca.location=ca-cert.pem
ssl.endpoint.identification.algorithm=https
21.9.4.2 SASL_PLAINTEXT/GSSAPI

クラスタ・アクセス・ディレクトリに必要なファイルであるGSSAPI認証プロトコルを使用するSASL_PLAINTEXTを持つApacheクラスタ

説明

SASL_PLAINTEXT/GSSAPIプロトコルは、暗号化を使用せずにKerberos認証を指定します。Kerberosチケットは外部で管理する必要があります(Oracle SQL Access to Kafkaの外部)。

DBMS_CREDENTIAL

Kerberosチケットは外部で管理されるため、必須ではありません。

クラスタ・アクセス・ディレクトリの必須ファイル

  1. CAファイルを指定するSSL.CA.locationを含むosakafka.propertiesファイルは、SSL認証局です。

次の例では、プロパティsecurity.protocolSASL_PLAINTEXTを指定し、プロパティsasl.mechanismGSSAPIを指定します。

security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.kerberos.principal=kafkaclient/FQDN-hostname@Realm
sasl.kerberos.config=krb5.conf 
sasl.kerberos.ccname=krb5ccname_osak 
21.9.4.3 SASL_PLAINTEXT/SCRAM-SHA-256

クラスタ・アクセス・ディレクトリに必要なファイルであるSCRAM-SHA-256認証プロトコルを使用するSASL_PLAINTEXTを持つApacheクラスタ

説明

SASL_PLAINTEXT/SCRAM-SHA-256プロトコルは、暗号化を使用せずにSASL SCRAM認証を指定します。

DBMS_CREDENTIAL

SASLユーザー名のパスワードを格納するために必要です。

クラスタ・アクセス・ディレクトリの必須ファイル

  1. osakafka.propertiesファイル。

次の例では、プロパティsecurity.protocolSASL_PLAINTEXTを指定し、プロパティsasl.mechanismSCRAM-SHA-256を指定します。

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.username=testuser 
21.9.4.4 SASL_SSL/PLAIN

クラスタ・アクセス・ディレクトリに必要なファイルであるPLAIN認証プロトコルを使用するSASL_SSLを持つApacheクラスタ

説明

SASL_SSL/PLAINプロトコルは、使用されたOSS Kafkaクラスタの設定を指定します

DBMS_CREDENTIAL

sasl.passwordを格納するために必要です。

クラスタ・アクセス・ディレクトリの必須ファイル

  1. osakafka.propertiesファイル。

例21-5 OSSクラスタのosakafka.propertiesファイル

次の例では、プロパティsecurity.protocolSASL_SSLを指定し、プロパティsasl.mechanismPLAINを指定します。

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.username=<tenancyName>/<username>/<streamPoolID>
 #-- limit request size to 1 MB per partition
max.partition.fetch.bytes=1048576

例21-6 非OSSクラスタのosakafka.propertiesファイル

次の例では、プロパティsecurity.protocolSASL_SSLを指定し、プロパティsasl.mechanismPLAINを指定します。ssl.ca.locationプロパティは、認証局(CA)ファイルを指定します。CAファイルはCA-cert.pemです。

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.username=kafkauser
ssl.ca.location=ca-cert.pem
ssl.endpoint.identification.algorithm=https
21.9.4.5 クライアント認証を使用したSSL

クラスタ・アクセス・ディレクトリに必要なファイルであるSSL認証プロトコルを使用するApacheクラスタ

説明

SSLプロトコルは、クライアント認証を使用してSSLを指定します。

DBMS_CREDENTIAL

SSLキーのパスワードを格納するために必要です。

クラスタ・アクセス・ディレクトリの必須ファイル

  1. osakafka.propertiesファイル。
  2. 構成権限(CA)ファイル
  3. rdkafkaクライアントのPEMファイル(rdkafka.client.pem)
  4. rdkafkaクライアント・キー(rdkafka.client.key)

例21-7 SSL osakafka.propertiesファイル

次の例では、プロパティsecurity.protocolSSLを指定し、プロパティssl.key.locationはrdkafkaクライアント・キーを指定し、sa.ca.locationプロパティは認証局ファイルを指定します。

security.protocol=SSL
ssl.certificate.location=rdkafka.client.pem
ssl.key.location=rdkafka.client.key
ssl.ca.location=ca-cert.pem
ssl.endpoint.identification.algorithm=https
21.9.4.6 クライアント認証を使用しないSSL

クラスタ・アクセス・ディレクトリに必要なファイルであるSSL認証プロトコルを持ちクライアント認証を使用しないApacheクラスタ

説明

SSLプロトコルは、クライアント認証を使用せずにSSLを指定します。

DBMS_CREDENTIAL

必須ではありません。

クラスタ・アクセス・ディレクトリの必須ファイル

  1. osakafka.propertiesファイル。
  2. 構成権限(CA)ファイル

例21-8 SSL osakafka.propertiesファイル

次の例では、プロパティsecurity.protocolSSLを指定し、sa.ca.locationプロパティは認証局ファイルを指定します。

security.protocol=SSL
ssl.ca.location=ca-cert.pem
ssl.endpoint.identification.algorithm=https

21.10 Oracle SQL Access to Kafkaアプリケーションの作成

Apacheクラスタ・データにアクセスするアプリケーションを作成するには、必要なアプリケーションのタイプを作成します。

Oracle SQL Access to Kafkaには、Apache Kafkaクラスタへのアタッチに使用できる次のアプリケーション・モードがあります:

  • ロード中: KafkaトピックからOracle Database表にデータをロードするために使用します。
  • ストリーミング: Kafkaトピックを順次読み取るために使用します。
  • シーク可能: 指定した開始タイムスタンプから終了タイムスタンプまでの間にKafkaトピックにランダムにアクセスするために使用します。

必要なKafkaトピックへのアクセスの種類に応じて、作成するアプリケーションのタイプを選択します:

  • DBMS_KAFKA.CREATE_LOAD_APPは、ロード・モードで使用できるアプリケーションを作成します。
  • DBMS_KAFKA.CREATE_STREAMING_APPは、ストリーミング・モードで使用できるアプリケーションを作成します。
  • DBMS_KAFKA.CREATE_SEEKABLE_APPは、シーク可能モードで使用できるアプリケーションを作成します。

例21-9 Kafkaトピックの4つのビューを使用したストリーミング・アプリケーションの作成

次の例では、4つのパーティションを持つKafkaトピックの一時表を含む一連の4つのビューを使用するためのストリーミング・アプリケーションが作成されます。ビューごとに一時表が作成されます。各ビュー(および一時表)は、Kafkaトピックの1つのパーティションに関連付けられています:


DECLARE
 v_options VARCHAR2;
BEGIN
 v_options := ‘{"fmt" : "DSV", "reftable" : "user_shape_table_name"}';
 SYS.DBMS_KAFKA.CREATE_STREAMING_APP (
                'ExampleCluster',
                'ExampleApp',
                'ExampleTopic',
                v_options,
                4);
END;
/

例21-10 Kafkaトピックの1つのビューを使用したストリーミング・アプリケーションの作成

次の例では、Kafkaトピックの一時表が4つのパーティションを持つ一時表と1つのビューを使用するためのストリーミング・アプリケーションが作成されます。ビュー(一時表)は、Kafkaトピック全体に関連付けられています:


DECLARE
 v_options VARCHAR2;
BEGIN
 v_options := ‘{"fmt" : "DSV", "reftable" : "user_shape_table_name"}';
 SYS.DBMS_KAFKA.CREATE_STREAMING_APP (
                'ExampleCluster',
                'ExampleApp',
                'ExampleTopic',
                v_options,
                1);
END;
/

21.11 Kafkaクラスタ接続のセキュリティ

Oracle SQL Access to Kafkaは、SSL、SASL、Kerberosなどの様々なセキュリティ・メカニズムを使用して、KafkaおよびOracle Streaming Service (OSS)へのアクセスをサポートします。

ノート:

Kafkaクラスタへのアクセスに使用される資格証明には、Kafkaブローカ・メタデータと、Oracle SQL Access to Kafkaアプリケーションの一部となるトピックの両方へのアクセス権が必要です。資格証明に対して有効なアクセス制御リスト(ACL)がある場合は、ブローカとKafkaトピックの両方にアクセス権が付与されていることを確認します。共有Oracle Real Application Clusters (Oracle RAC)環境では、セキュリティ資格証明は、クラスタ・メンバー・ノードに対してローカルではなく、共有の場所に配置する必要があります。

セキュアなKafkaクラスタ

Oracle Databaseとクラスタ間でセキュアに暗号化されたデータ転送を維持するために、Oracle SQL Access to Kafkaには、いくつかのセキュリティ・プロトコルが採用されています。セキュアなKafkaクラスタおよびOracle Streaming Services (OSS)クラスタへのアクセスには、セキュリティ構成ファイルが使用されます。これらのオペレーティング・システム・ファイルは、クラスタ構成ディレクトリに存在する必要があります。クラスタ構成ファイルにアクセスするために、クラスタ構成Oracleディレクトリ・オブジェクトが作成されます。このディレクトリへのREADアクセス権が付与されるのは、osak_admin_roleのみです。クラスタ構成ファイルは、osak_admin_roleによってのみ読取り可能です。クラスタ構成ファイルには、osakafka.propertiesファイルと、SSL/TLS/PEMファイルや証明書などの追加のセキュリティ・ファイルが含まれます。SSLのキーおよび証明書は、Oracleキーストアに格納されます。

クラスタ・アクセスOracleディレクトリ・オブジェクトは、Kafkaクラスタへのアクセスを制御するために使用されます。このディレクトリ・オブジェクトには構成ファイルが含まれていません。Kafkaセッションは、マルチテナント環境の個々のPDBに限定されます。Kafkaブローカに接続するアプリケーションを作成する各PDBは、独自のアプリケーションを作成する必要があります。

パスワードをファイルに埋め込まないでください。osakafka.propertiesファイルに埋め込まれたパスワード・プロパティは無視されます。すべてのパスワードは、DBMS_CREDENTIALパッケージを使用してデータベース資格証明として格納する必要があります。

Kerberos認証を使用したKafkaクラスタ

Kerberos認証を使用するKafkaクラスタの場合、osakafka.propertiesファイルで指定されたKafkaプリンシパルのKerberosチケットをデータベース・システムで取得し、Oracle SQL Access to Kafkaの外部で定期的に更新する必要があります。

クラスタ構成ディレクトリ・オブジェクト、クラスタ・アクセス・ディレクトリ・オブジェクトおよびデータベース資格証明名は、DBMS_KAFKA_ADM.REGISTER_CLUSTER()コールの入力パラメータとして指定する必要があります。

Oracle SQL Access to Kafka管理者(osak_admin_roleを持つユーザー、OSAK_ADMIN)は、クラスタ登録および管理タスクを実行します。

21.12 セキュアでないKafkaクラスタへのアクセスの構成

セキュアでないKafkaクラスタへのアクセスを構成するには、OSAK管理者(osak_admin_roleを持つOracle Databaseユーザー)がこの手順を完了する必要があります。

セキュアでないKafkaクラスタにアクセスするには、Kafkaクラスタへのアクセスを制御するクラスタ・アクセス・データベース・ディレクトリ・オブジェクトを作成する必要があります。このデータベース・ディレクトリの権限付与は、KafkaクラスタにアクセスできるOracle Databaseユーザーの制御に使用されます。このデータベース・ディレクトリには空のパスがあります。対応するオペレーティング・システム・ディレクトリは必要なく、ファイルも含まれていません。クラスタ・アクセス・データベース・ディレクトリ・オブジェクトのOracleディレクトリ・オブジェクト名は、OSAK_CLUSTER_NAME_ACCESSという形式(CLUSTER_NAMEはKafkaクラスタの名前)にすることをお薦めします。

手順:

  1. パスが空のクラスタ・アクセス・データベース・ディレクトリを作成します。このディレクトリは、KafkaクラスタにアクセスできるOracleユーザーを制御するために使用されます。

    たとえば、空のパスを持つoaskaccess_kafkaclust1という名前のクラスタ・アクセス・データベース・ディレクトリ・オブジェクトを作成します。このディレクトリは、KafkaクラスタにアクセスできるOracleユーザーを制御するために使用されます。

    SQL> CREATE DIRECTORY OSAK_KAFKACLUS2_ACCESS AS '';
  2. ターゲットのOracle Databaseサーバーで、パスOracle_base/osak/cluster_name/config (Oracle_baseはOracleベース・ディレクトリ、cluster_nameはKafkaクラスタ名)を使用して、Oracleベース・パス・ディレクトリにクラスタ構成オペレーティング・システム・ディレクトリを作成します。たとえば:

    mkdir /u01/app/oracle/osak/kafkaclus2/config

    SYSDBAとしてデータベースにログインし、SQLを起動して、対応するOracleディレクトリ・オブジェクトを作成します。この例では、Kafkaクラスタ名はKAFKACLUS2です:

    SQL> CREATE DIRECTORY OSAK_KAFKACLUS2_CONFIG AS  'u01/app/oracle/osak/kafkaclus2/config';
  3. 空のosakafka.propertiesファイルまたはOSAKチューニングまたはデバッグ・プロパティを含むosakafka.propertiesファイルを作成します。

  4. SQLで、DBMS_KAKFA_ADM.REGISTER_CLUSTER()を使用してKafkaクラスタを登録します。たとえば、サーバー・ホスト名mykafkabootstrap-host、ポート9092をKafkaクラスタKAFKACLUS2に使用します:

    SQL> select DBMS_KAFKA_ADM.REGISTER_CLUSTER (
              cluster_name  => ‘KAFKACLUS2’,
              bootstrap_servers =>‘Kafka-example-host:9092’, 
              kafka_provider => 
    DBMS_KAFKA_ADM.KAFKA_PROVIDER_APACHE, 
              cluster_access_dir => ‘OSAK_KAFKACLUS2_ACCESS’,
              credential_name => NULL,
              cluster_config_dir => ‘OSAK_KAFKACLUS2_CONFIG’,
               cluster_description =>  'My test cluster kafkaclus2’,
               options  => NULL) 
    from dual;

    構成に成功すると、登録の戻り値は0になります:

    SQL> DBMS_KAFKA_ADM_RE…..
                 0
    
  5. Kafkaユーザーに読取りアクセス権を付与します。次の例では、ユーザーapp2-usrに、KAFKACLUS2という名前のKafkaクラスタへのアクセス権が付与されます:
    SQL> grant read on directory osak_kafkaclus2_access to app2-usr;

21.13 セキュアなKafkaクラスタへのアクセスの構成

セキュアなKafkaクラスタへのアクセスを構成するには、この手順を使用します。

セキュアなKafkaクラスタへのアクセスには、osakafka.propertiesなどの構成ファイルと、SSL/TLS PEMファイルや証明書などの追加のセキュリティ・ファイルが必要です。これらのファイルは、クラスタ構成データベース・ディレクトリ・オブジェクトに格納されます。構成ファイルおよびディレクトリは、オペレーティング・システム・ディレクトリおよびファイル・アクセス権限によって保護されます。

クラスタ構成のオペレーティング・システム・ディレクトリは、Oracleベース・ディレクトリで構成され、Oracle Installatoin所有者またはOracleユーザー(oracle)およびOracle Inventoryグループ(oinstall)が所有しています。OracleユーザーおよびOracle Inventoryグループは、750 (rwxr-x---)に設定されたディレクトリ権限を持つ必要があり、ディレクトリ内のosakafka.propertiesファイルは、540 (rw-r-----)に設定された権限を持つ必要があります。クラスタ構成ディレクトリ内の他のすべてのファイルは、440 (r--r-----)に設定された権限を持つ必要があります。

  • Oracle SQL Access for Kafka構成ファイル(osakafka.properties)が作成され、クラスタ構成データベース・ディレクトリ・オブジェクトに格納されます。
  • 選択したセキュリティ方式のセキュリティ・ファイル(Kerberos、SSL、PEMファイルを使用したTLS/SSL、およびそのために作成された証明書など)は、クラスタ構成データベース・ディレクトリ・オブジェクトに格納されます。

手順:

  1. Kafkaクラスタへのアクセスを制御するクラスタ・アクセス・データベース・ディレクトリ・オブジェクトを作成します。このデータベース・ディレクトリ・オブジェクトの権限付与は、KafkaクラスタにアクセスできるOracle Databaseユーザーの制御に使用されます。このデータベース・ディレクトリには空のパスがあります。つまり、対応するオペレーティング・システム・ディレクトリは必要なく、ファイルは含まれません。

    たとえば、空のパスを持つoaskaccess_kafkaclust1という名前のクラスタ・アクセス・データベース・ディレクトリ・オブジェクトを作成します。このディレクトリは、KafkaクラスタにアクセスできるOracleユーザーを制御するために使用されます。

    SQL> CREATE DIRECTORY osakaccess_kafkaclus1 AS ''; 
  2. ターゲットのOracle Databaseサーバーで、パスOracle_base/osak/cluster_name/config (Oracle_baseはOracleベース・ディレクトリ、cluster_nameはKafkaクラスタ名)を使用して、Oracleベース・パス・ディレクトリにクラスタ構成オペレーティング・システム・ディレクトリを作成します。たとえば:

    mkdir /u01/app/oracle/osak/kafkaclus1/config
  3. SYSDBAとしてデータベースにログインし、SQLを起動して、対応するOracleディレクトリ・オブジェクトをターゲットのOracle Databaseに作成します。データベース・オブジェクト名にOSAK_clustername_accessを使用することをお薦めします。clusternamneはKafkaクラスタの名前です。たとえば:

    CREATE DIRECTORY OSAK_KAFKACLUS1_CONFIG
       AS '/u01/app/oracle/osak/kafkaclus1/config';
  4. 使用するセキュリティ・プロトコルに基づいて、クラスタ構成ディレクトリにosakafka.propertiesファイルを作成します。このファイルは、librdkafkaクライアント・プロパティ・ファイルに似ています。

    次の例では、クライアント認証を使用してセキュリティ・プロトコルにSecure Socket Layer (SSL)を使用するようにosakafka.propertiesファイルが構成されます。

    security.protocol=ssl
    ssl.ca.location=ca-cert
    ssl.certificate.location=client_myhostname_client.pem 
    ssl.key.location=client_myhostname_client.key
    ssl.key.password=password-that-is-ignored
  5. osakafka.propertiesで参照されるセキュリティ・ファイルをクラスタ構成ディレクトリにコピーします。たとえば、ca-certパスが/etc/ssl/certs/である場合:

    $cp /etc/ssl/certs/ca-cert /u01/app/oracle/osak/kafkaclus1/config;
    $cp /etc/ssl/certs/client-myhostname-client.pem /u01/app/oracle/osak/kafkaclus1/config;
    $cp /etc/ssl/certs/client-myhostname-client.key /u01/app/oracle/osak/kafkaclus1/config;
    
  6. 資格証明を設定します:
    • osakafka.propertiesファイルでSSL.key.locationまたはsasl.usernameプロパティを使用する場合:

      SSL SASI認証を使用して、Kafkaクラスタでの認証に必要なパスワードを格納するためのデータベース資格証明を作成します。対応するパスワード・プロパティssl.key.passwordまたはsasl.passwordは、クラスタ登録プロセス中にDBMS_KAFKAによって自動的に追加されます。たとえば:

      begin
          dbms_credential.create_credential(
              credential_name => 'KAFKACLUS1CRED1',
              username => 'KAFKACLUS1',
              password => 'enter-ssl-key-password-or-sasl-password);
      end;
      /    
    • Kafkaクラスタが認証メカニズムとしてGSSAPI/Kerberosを使用する場合:

      osakafka.propertiesファイルにリストされているKafkaプリンシパルのデータベース・システムでKerberosチケットを取得します

  7. SYSDBAとしてログインし、SQLを起動し、SYS.DBMS_KAFKA_ADM.REGISTER_CLUSTER()プロシージャを使用してKafkaクラスタを登録します。次の例では、KafkaクラスタKAFKACLUS1が登録されます:

    select DBMS_KAFKA_ADM.REGISTER_CLUSTER ('KAFKACLUS1',
                 'mykafkabootstrap-host:9092', 
                 DBMS_KAFKA_ADM.KAFKA_PROVIDER_APACHE, 
                 'OSAK_KAFKACLUS1_ACCESS'
                 'KAFKACLUS1CRED1', 
                 'OSAK_KAFKACLUS1_CONFIG',
                 'My test cluster kafkaclus1') from dual;
    

    成功すると、0が返されます。たとえば:

    SQL> DBMS_KAFKA_ADM_RE…..
                 0
    
  8. Kafkaユーザーに読取りアクセス権を付与します。次の例では、ユーザーapp1-usrに、KAFKACLUS1という名前のKafkaクラスタへのアクセス権が付与されます:
    SQL> grant read on directory OSAK_KAFKACLUS1_ACCESS to app1-usr;

21.14 Oracle SQL Access to Kafkaクラスタの管理

Oracle SQL Access to Kafkaを使用してKafkaクラスタ定義を更新、一時的に無効化および削除する方法を確認してください

21.14.1 Kafkaクラスタへのアクセスの更新

Kafkaクラスタ環境が変更された場合、それらの変更のクラスタ定義および構成を更新できます。

Kafkaクラスタ定義の存続期間中、クラスタ定義を更新する必要がある場合は、DBMS_KAFKA_ADM.UPDATE_CLUSTER_INFOおよびDBMS_KAFKA_ADM.CHECK_CLUSTERを使用できます。

21.14.2 Kafkaクラスタへのアクセスの無効化または削除

Oracle SQL Access to Kafkaクラスタを一時的に無効にしたり、不要になった場合はその接続を削除したりできます。

例21-11 Kafkaクラスタの無効化

Kafka環境の一時的な停止中に、Kafkaクラスタへのアクセスを一時的に無効にできます

  • DBMS_KAFKA_ADM.DISABLE_CLUSTER (後ろに次が続く)
  • DBMS_KAFKA_ADM.ENABLE_CLUSTER (Kafka環境がバックアップされている場合)

例21-12 Kafkaクラスタの削除

クラスタ定義が不要になった場合、OSAK管理者はクラスタ定義を削除できます

  • DBMS_KAFKA_ADM.DEREGISTER_CLUSTER

21.15 KafkaデータとOracle SQL Access to Kafkaの使用に関するガイドライン

アプリケーション開発計画の一部としてガイドライン、制限および推奨事項を確認します。

21.15.1 Kafka一時表およびアプリケーション

Oracle SQL Access to Kafkaビューおよび対応する一時表は、一意のKafkaアプリケーション(グループID)にバインドされており、そのアプリケーションにかわってトピック内の1つ以上のパーティションに排他的にアクセスする必要があります。

これらのガイドラインを使用して、アプリケーションの制限を支援します。

KafkaグループIDおよびOracle SQL Access to Kafka一時表

標準のOracle表およびビューとは異なり、Apache Kafkaデータの消費に関するルールに従って、Kafka一時表を複数のアプリケーションで共有することはできません。Kafkaデータの場合、各一時表は、特定の時点でKafkaから直接フェッチされるデータのスナップショットであり、Kafkaクラスタを識別する正規名形式、アプリケーション名、ビューID、Kafka内のコンシューマ・グループID (groupID)に関連付けられたアプリケーションのかわりにクラスタまたはトピック内の1つ以上のパーティションにアクセスする特定のビューを識別する整数があります。Oracle Databaseで作成された一時ビューおよび表は、一意のKafkaアプリケーション(groupIDで識別)にバインドされており、そのアプリケーションのかわりにトピック内の1つ以上のパーティションに排他的にアクセスする必要があります。これらのパーティションへのアクセスを他のアプリケーションと同時に共有することはできません。この制限は、Oracleアプリケーション・インスタンスにまで及びます。Oracle SQL Access to Kafkaビューおよびその関連の一時表は、そのアプリケーション専用である必要があります。同じKafkaトピックまたはパーティション・データを問い合せるよう複数のアプリケーションを構成する場合、これらのアプリケーションがそれ自体を別のアプリケーション(つまり、異なる一意のKafkaグループIDを持つアプリケーション)として識別し、独自のグループIDとアプリケーション・アイデンティティ、および追跡する一連の独自のオフセットが反映された、独自のOracle SQL Access to Kafkaアプリケーションを作成する必要があります。

Oracle SQL Access to Kafkaでビューおよび表を使用するためのガイドライン

アプリケーションのビューおよび表を、そのデータで実行する分析の種類に従って作成します。

アプリケーションで分析にOracle SQLを使用するには、問合せ対象のデータのすべてのパーティションを取得するそのアプリケーションのOracle SQL Access to Kafkaビューを作成することをお薦めします。単一のアプリケーション・インスタンスによるアクセスごとに、トピック内のすべての新しいKafkaデータが取得され、アプリケーションが格納または表示できる集計情報が生成されます。

Oracle SQLを使用して分析を実行せず、かわりにアプリケーション自体で複雑なロジックを使用する場合は、アプリケーション・インスタンスをスケールアウトし、各Oracle SQL Access to Kafkaビューを単一のアプリケーション・インスタンスのかわりに単一のパーティションにアクセスさせることをお薦めします。この場合は通常、Kafkaデータは標準のOracle表に結合され、アプリケーションに返されるデータをエンリッチします。

アプリケーションでより多くの分析が実行される前に一部のSQL分析および結合が実行される場合、トピック内のパーティションの一部のサブセットにビューをマップすることをお薦めします。

21.15.2 ストリーミングを使用した複数のアプリケーションとのKafkaデータの共有

複数のアプリケーションがKafkaデータを使用できるようにするには、Oracle SQL Access to Kafkaを使用してKafka表をユーザー表にストリーミングします。

Kafkaデータを複数のOracleユーザーと共有することで、この表が特定のグループIDに関連付けられないようにするために、アプリケーション・ユーザーに、PL/SQLプロシージャDBMS_KAFKA.EXECUTE_LOAD_APPを使用してOracle SQL Access to Kafkaをロード・モードで実行させて、そのユーザーが所有する表を作成することをお薦めします。このオプションを使用すると、単一のアプリケーション・インスタンスがPL/SQLのロード・プロシージャを定期的に実行し、すべての新しいデータをKafkaトピックからOracle Database表に増分的にロードします。データが表にロードされた後、一時表に適用される制限なく、その表へのアクセス権を付与された標準のOracle Databaseアプリケーションからアクセス可能にできます。

21.15.3 Kafka表の削除および再作成

KafkaオフセットはDBMS_KAFKAメタデータ表によって管理されるため、Kafkaトピック構成の変更には、Oracle SQL Access to Kafkaアプリケーションに対する手動更新が必要になる場合があります。

Oracleアプリケーション・インスタンスが、どのKafka表コンテンツが読み取られたか、およびそれがどこで読み取られたかを識別できるようにするには、Kafkaトピックのパーティション・オフセットをアプリケーション・インスタンスごとに追跡する必要があります。

Kafkaでは、オフセットをコミットするための3つのモデルがサポートされています:

  • 自動コミット: Kafkaが、短い時間スケジュールでフェッチされた最後のオフセットをコミットします
  • 手動コミット: アプリケーションが、オフセットをコミットするリクエストをKafkaに送信します
  • アプリケーション管理のコミット: Kafkaのコミットはアプリケーションによって完全に管理されます。

Oracleでは、アプリケーション管理のコミットが使用されます。これらのコミットでは、Kafkaはこれを、Kafkaに明示的にコミットすることなく手動コミットを宣言するアプリケーションとみなします。オフセットは、DBMS_KAFKAメタデータ表で排他的に記録および保守されます。これらの表は、Oracle DatabaseのACIDトランザクション・プロパティによって保護されます。トランザクションの整合性を保証するために、Oracle SQL Access to Kafkaでは、Kafka自動コミットまたはKafka手動コミットをサポートしていません。

Kafkaトピックを削除して再作成する場合は、シナリオに応じて、その表を手動で更新する必要があります:

例21-13 同じパーティションを持つビューの削除およびリセット

パーティションの数が元のKafkaトピック構成と同じままである場合、ビューをリセットしてOracle SQL Access to Kafkaビューにリセットし、再作成されたトピック内のKafkaパーティションの先頭から処理を開始する必要があります。ビューをリセットするには、プロシージャDBMS_KAFKA.INIT_OFFSET(view_name, 0, 'WML')をコールします。ここで、view_nameはビューの名前です。

例21-14 パーティションが少ないビューの削除およびリセット

このオプションは使用できません。パーティションの数が元のKafkaトピック構成より少ない場合は、このトピックに関連付けられたOracle SQL Access to Kafkaアプリケーションを削除して再作成する必要があります。

例21-15 パーティションが多いビューの削除およびリセット

パーティションの数が元のKafkaトピック構成より多い場合は、プロシージャDBMS_KAFKA.INIT_OFFSET(view_name, 0, 'WML') (view_nameはビューの名前)をコールしてOracle SQL Access to Kafkaビューをリセットしてから、このトピックを使用してOracle SQL Access to KafkaアプリケーションごとにプロシージャDBMS_KAFKA.ADD_PARTITIONSをコールする必要があります。

21.16 アプリケーション用のKafkaクラスタ・アクセス・モードの選択

Oracle SQL Access to Kafkaを使用するには、アプリケーションに必要なデータ・アクセスのモードを決定します。

21.16.1 Oracle Database表へのKafkaレコードの増分ロードの構成

アプリケーションがKafkaトピックからOracle Database表にデータを増分ロードできるようにするには、Oracle SQL Access to Kafkaをロード・モードで使用します。

EXECUTE_LOAD_APPプロシージャを使用して増分ロードを実行するようにOracle SQL Access to Kafkaを構成すると、Kafkaデータを標準のOracle表に移動できます。この表には、Oracle SQL Access to Kafka一時表を使用するときに1つのリーダー制約を課さずに複数のアプリケーションからアクセスできます。

KafkaデータをOracle Database表に増分ロードするために、アプリケーションは、PL/SQLプロシージャDBMS_KAFKA.CREATE_LOAD_APPをコールして、後続のDBMS_KAFKA.EXECUTE_LOAD_APPコールの状態を初期化することによって、ロード・アプリケーションであることを宣言します。DBMS_KAFKA.CREATE_LOAD_APPプロシージャは、トピックのすべてのパーティションに対して単一のビューを作成します。

トピック全体のデータが不要な場合は、DBMS_KAFKA.INIT_OFFSET[_TS]プロシージャをコールしてKafkaデータをロードするためのKafkaトピック・パーティションの開始点を設定するようにアプリケーションを構成することもできます。

DBMS_KAFKA.EXECUTE_LOAD_APPプロシージャはアプリケーション・ループでコールされ、前のコールが中断した場所からKafkaトピックの現在の高水位標にデータをロードします。この手順は自律型トランザクションで実行されます。

KafkaトピックからOracle Database表にデータをロードするには:

  • DBMS_KAFKA.CREATE_LOAD_APPを使用して、Oracle SQL Access to Kafkaロード・アプリケーションを作成します
  • オプションで、DBMS_KAFFA_INIT_OFFSET_TSまたはDBMS_KAFKA_INIT_OFFSETを使用して、最初に読み取られるKafkaレコードを設定します
  • 完了までLOOPします
    • DBMS_KAFKA.EXECUTE_LOAD_APPを使用して、中断した場所から現在の最高水位標までのKafkaデータをロードします。
  • DBMS_KAFKA.DROP_LOAD_APPを使用して、ロード・アプリケーションを削除します

21.16.2 Oracle SQL問合せでのKafkaレコードへのストリーミング・アクセス

Kafkaトピックに、トピックの先頭から、またはKafkaトピックの特定の開始点から順次アクセスするには、Oracle SQL Access to Kafkaをストリーミング・モードで使用します。

アプリケーションでKafkaトピックに順次アクセスする必要がある場合、Oracle SQL Access to Kafkaをストリーミング・モードで構成します。このモードでは、Oracle SQL Access to Kafka一時表を使用したSQL問合せで、アプリケーション処理ループ内でKafkaレコードに順次アクセスできます。このユースケースでは、アプリケーションは、PL/SQLプロシージャDBMS_KAFKA.CREATE_STREAMING_APPをコールして、KafkaビューへのOracle SQLアクセスの後続の問合せの状態を初期化することにより、ストリーミング・アプリケーションであることを宣言します。この手順では、ビューの作成に加えて、ビューごとにグローバル一時表も作成します。また、INIT_OFFSET[_TS]プロシージャを使用して、アプリケーションのKafkaトピック・パーティションの開始点を設定することもできます。開始点として設定すると、最初の問合せが開始点からKafkaデータを読み取ります。これにより、アプリケーションが処理ループで次のステップを実行できるようになります:

  1. DBMS_KAFKA.CREATE_STREAMING_APPをコールして、Oracle SQL Access to Kafkaストリーミング・アプリケーションを作成します。
  2. (オプション) DBMS_KAFFA_INIT_OFFSET_TSまたはDBMS_KAFKA_INIT_OFFSETをコールして、最初に読み取るKafkaレコードを設定します。
  3. 完了までLOOPします:
    1. DBMS_KAFKA.LOAD_TEMP_TABLEをコールして、Kafkaから次の行セットを含むグローバル一時表をロードします
    2. 取得されたプロセス・データをOSAKグローバル一時表からSELECTします
    3. 処理が成功した場合は、DBMS_KAFKA.UPDATE_OFFSETをコールして、最後に読み取られたKafkaオフセットを更新します
    4. COMMITを使用してオフセット追跡情報をコミットします。
  4. 終了したら、DBMS_KAFKA.DROP_STREAMING_APPをコールしてアプリケーションを削除します。

PL/SQLプロシージャDBMS_KAFKA.UPDATE_OFFSETは、Oracle SQLアクセスで識別されるすべてのパーティションについて、KafkaグループIDのKafkaパーティション・オフセットをKafkaビューに透過的に進めます。これにより、DBMS_KAFKA.LOAD_TEMP_TABLEのすべてのコールで、一連の新しい未読のKafkaレコードが取得され、処理されます

トランザクションがまだ開始されていない場合、UPDATE_OFFSETはOracleトランザクションを開始し、メタデータ表に最後のオフセットを記録することに注意してください。このため、トランザクションがセッション情報を失わないようにするには、UPDATE_OFFSETをコールするたびにトランザクションをコミットするようにアプリケーションを構成する必要があります。トランザクションをコミットした後、Oracle SQL Access to KafkaによってOracleトランザクション内のオフセットが管理されるため、レコードは失われることも、再度読み取られることもありません。トランザクションが完了しなかった場合、オフセットは進められません。アプリケーションは、データの読取りを再開する場合、以前の読取りを停止した場所からKafkaデータの読取りを再開できます。

21.16.3 Oracle SQL問合せでのKafkaレコードに対するシーク可能なアクセス

2つのタイムスタンプ間でKafkaレコードにランダムにアクセスするには、Oracle SQL Access to Kafkaをシーク可能モードで使用します

Oracle SQL Access to Kafkaのシーク可能モードでは、アプリケーションは、(通常は、ストリーミング・アクセスを行うピア・アプリケーションによって識別される)目的のタイムスタンプ間でKafkaレコードを読み取ることができます。このモードでは、DBMS_KAFKA.LOAD_TEMP_TABLEプロシージャが一時表に移入する時間範囲を定義する開始タイムスタンプと終了タイムスタンプを指定します。アプリケーションは、シーク可能モードでKafkaにアクセスするための状態を初期化するためにPL/SQLプロシージャDBMS_KAFKA.CREATE_SEEKABLE_APPをコールして、シーク可能アプリケーションであることを宣言します。この手順では、トピックのすべてのパーティションにわたってビューおよび対応するグローバル一時表を作成します。DBMS_KAFKA.SEEK_OFFSET_TSプロシージャがコールされ、問合せ元の時間範囲が指定されます。アプリケーションは、DBMS_KAFKA.LOAD_TEMP_TABLEプロシージャをコールする前にSEEK_OFFSET_TSをコールして、次の行セットを含む一時表をロードします。

2つのタイムスタンプ間のKafkaレコードにアクセスするために「シーク可能」モードでKafkaデータを問い合せるには

  • DBMS_KAFKA.CREATE_SEEKABLE_APPを使用して、Oracle SQL Access to Kafkaシーク可能アプリケーションを作成します
  • 完了までLOOPします
    • DBMS_KAFKA.SEEK_OFFSET_TSを使用して、Kafkaトピック内のユーザー定義時間範囲を検索します
    • DBMS_KAFKA.LOAD_TEMP_TABLEをコールして、Kafkaから行セットを含むグローバル一時表をロードします
    • OSAKグローバル一時表からSELECTします
    • データを処理します
  • アプリケーションで実行した場合、DBMS_KAFKA.DROP_SEEKABLE_APPを使用します

21.17 Oracle SQL Access to Kafkaアプリケーションの作成

LOADアプリケーションでKafkaデータを問い合せるには、次のプロシージャを使用してKafkaデータをOracle Database表にロードします。

ロード・プロシージャの一般的な用途は次のとおりです:

DBMS_KAFKA.CREATE_LOAD_APP: このプロシージャは、Oracle表へのロードの設定に使用されます

DBMS_KAFKA.INIT_OFFSET[_TS] (OPTIONAL): このプロシージャは、一連のロード操作の開始点を制御するために、すべてのトピック・パーティションでオフセットを設定するために使用されます。このプロシージャは、プロシージャを実行するKafkaトピックから新しい行をロードする必要がなくなるまで繰り返します。

DBMS_KAFKA.EXECUTE_LOAD_APP: このプロシージャは、Kafkaトピックからすべてのトピック・パーティションの高水位標に新しい未読レコードをロードするために使用されます

DBMS_KAFKA.DROP_LOAD_APP: このプロシージャは、プロシージャを実行しているKafkaトピックからのロードが完了したときに使用します。

21.17.1 Oracle SQL Access to Kafkaを使用したロード・アプリケーションの作成

Oracle Database表にデータをロードする場合は、ロード・モードDBMS_KAFKAを使用します。

Oracle SQL Access to Kafkaロード・アプリケーションは、Kafkaトピックのすべてのパーティションからデータを取得し、そのデータを処理のためにOracle Database表に配置します。また、Kafkaクラスタでライブ・トピックおよびKafkaトピックに関するパーティション情報を調べるために使用されるメタデータ・ビューも作成します(存在しない場合)。このビューは一度作成されたら、同じクラスタを共有するすべてのアプリケーションに対応します。作成されたLOADアプリケーションに対してDBMS_KAFKA.EXECUTE_LOAD_APPをコールできるのは、1つのアプリケーション・インスタンスのみです。

例21-16 DBMS_KAFKA.EXECUTE_LOAD_APPを使用した表へのデータのロード

この例では、ロード・アプリケーション用に1つのビューおよび関連する一時表を作成します。Kafkaクラスタ名はExampleCluster、アプリケーション名はExampleAppです。Kafkaトピックは、4つのパーティションを持つトピックであるExampleTopicです:


DECLARE
v_options VARCHAR2;
BEGIN
     v_options := ‘{“fmt” : “DSV”, “reftable” : “user_reftable_name”}’;
 SYS.DBMS_KAFKA.CREATE_LOAD_APP (
                       ‘ExampleCluster’,
                       ‘ExampleApp’,
                       ‘ExampleTopic’,
                       v_options);
END;
/

例21-17 DBMS_KAFKA.EXECUTE_LOAD_APPを使用した表へのデータの定期的なロード

一連のアプリケーション・ビューからKafkaデータを処理するかわりに、最新のデータを定期的に表にフェッチし、単にKafkaからOracle Database表にデータをロードすることを選択することもできます。この例のDBMS_KAFKA.EXECUTE_LOAD_APPプロシージャは、Kafkaクラスタから最新データを取得し、このデータを表ExampleLoadTableに挿入します。この表のデータを使用するアプリケーションには、DBMS_KAFKA.INIT_OFFSET[_TS]をコールしてロードの開始点を設定するオプションがあります。


DECLARE
    v_records_inserted INTEGER;
BEGIN
    SYS.DBMS_KAFKA.EXECUTE_LOAD_APP (
                           ‘ExampleCluster’,
                           ‘ExampleLoadApp’,
                           ‘ExampleLoadTable’,
                           v_records_inserted);
END;

例21-18 DBMS_KAFKA.DROP_LOAD_APPまたはDBMS_KAFKA.DROP_ALL_APPSを使用したKafkaビューおよびメタデータの削除

Oracle SQL Access to Kafkaロード・アプリケーションが不要になった場合は、DBMS_KAFKA.DROP_LOAD_APPをコールしてビューおよびメタデータを削除できます。次の例では、KafkaクラスタはExampleCluster、アプリケーションはExampleAppです。

EXEC SYS.DBMS_KAFKA.DROP_LOAD_APP
      (‘ExampleCluster’, ‘ExampleApp’);

Oracle SQL Access to Kafkaアプリケーションへの1つ以上のOracle SQLアクセスが存在しなくなった場合は、DBMS_KAFKA.DROP_ALL_APPSをコールして、特定のクラスタのすべてのアプリケーションを削除できます

EXEC SYS.DBMS_KAFKA.DROP_ALL_APPS
      (‘ExampleCluster’);

21.17.2 Oracle SQL Access to Kafkaを使用したストリーミング・アプリケーションの作成

Oracle Database表にデータをロードする場合は、ロード・モードDBMS_KAFKAを使用します。

ストリーミングを使用すると、データを大規模に処理できます。Oracle SQL Access to Kafkaをストリーミング・モードで使用して、複数のアプリケーション・インスタンスを作成できます。複数のインスタンスにより、アプリケーションは、1つ以上のスレッド、プロセスまたはシステムで同時に実行されているアプリケーション・インスタンス間でKafkaデータを分析するワークロードをスケール・アウトおよび分割できます。

Oracle SQL Access to Kafkaストリーミング・アプリケーションには、Kafkaグローバル一時表への一連の専用Oracle SQLアクセスと、KafkaビューへのOracle SQLアクセスが含まれます。これらの一時表およびビューは、Kafkaトピック内のパーティションから新しい未読レコードを取得するために使用できます。

また、KafkaクラスタでアクティブなトピックおよびKafkaトピックに関するパーティション情報を調べるために使用されるメタデータ・ビューも作成します(存在しない場合)。このビューは一度作成されたら、同じクラスタを共有するすべてのアプリケーションに対応します。

各Oracle SQL Access to Kafkaグローバル一時表およびその関連ビューは、Oracle SQL Access to Kafkaアプリケーションの1つのインスタンスによって排他的に使用されます。

各アプリケーション・インスタンスはLOAD_TEMP_TABLEをコールします。これにより、専用のOracle SQL Access to Kafkaグローバル一時表に、関連付けられたビューから取得されたKafka行が移入されます。これにより、アプリケーションは、Oracle SQL Access to Kafkaグローバル一時表のコンテンツに対して1つ以上のSQL問合せを実行できるようになります。アプリケーションが現在の一連のKafka行で実行されると、UPDATE_OFFSETおよびCOMMITをコールします。

STREAMINGモード・アプリケーションは、アプリケーション用に必要なOracle SQL Access to Kafkaビューおよび一時表の数を選択するようアプリケーションを構成できるという点で、LOADまたはSEEKINGアプリケーションとは異なります。Oracle SQL Access to Kafkaアプリケーションの他のタイプと同様、各アプリケーション・インスタンスは、1つの一意のOracle SQL Access to Kafka一時表を排他的に問い合せます。各1つの一意のビューおよびグローバル一時表名には、クラスタ名、アプリケーション名およびアプリケーション・インスタンス識別子(ID)が含まれます。

アプリケーションを作成する際には、作成する1つの一意のビューおよび一時表のペアの数が1からNの間であることに注意してください。ここで、NはKafkaトピック内のパーティションの数です。

実行時に、各アプリケーション・インスタンスは独自のユーザー・セッションで実行され、1つの一意のグローバル一時表およびその関連ビューを処理します。したがって、アプリケーション・インスタンスを同時に実行するには、少なくともKafkaトピック内のパーティションと同じ数(つまり、Nの値)のセッションをユーザーに割り当てる必要があります。view_countがユーザー当たりの最大セッション数を超えると、このコールは失敗し、ユーザーに割り当てられているセッションが不足していることを示すエラーが表示されます。特定のOracle SQL Access to Kafkaビューおよびその関連グローバル一時表にバインドされたKafkaパーティションの数は、作成されたビューの数および存在するパーティションの数によって異なります。Oracle SQL Access to Kafkaは、各ビューに割り当てられるパーティションの数を均等に分散します。

例21-19 DBMS_KAFKA.CREATE_STREAMING_APPを使用した表へのデータのストリーミング

この例では、ExampleTopicという名前のトピックのデータを使用して、ストリーミング・モード・アプリケーションの一連の4つのビューおよび関連する一時表を作成します。このトピックには4つのパーティションがあり、各ビューおよび一時表は1つのパーティションに関連付けられています:

DECLARE
   v_options  VARCHAR2;
BEGIN
  v_options := ‘{“fmt” : “DSV”, “reftable” : “user_reftable_name”}’;
  SYS.DBMS_KAFKA.CREATE_STREAMING_APP (
                ‘ExampleCluster’,
                ‘ExampleApp’,
                ‘ExampleTopic’,
                v_options, 
                4);
END;
/

例21-20 DBMS_KAFKA.CREATE_STREAMING_APPを使用した単一の表へのデータのロード

この例では、ストリーミング・モードを使用して、トピックの4つのパーティションすべてに関連付けられているアプリケーションに対して1つのビューと関連する一時表を作成します:

DECLARE
v_options VARCHAR2;
BEGIN
              v_options := ‘{“fmt” : “DSV”, “reftable” : “user_reftable_name”}’;
 SYS.DBMS_KAFKA.CREATE_STREAMING_APP (
                ‘ExampleCluster’,
                ‘ExampleApp’,
                ‘ExampleTopic’,
                v_options,
                1);
END;
/

例21-21 DBMS_KAFKA.DROP_STREAMING_APPまたはDBMS_KAFKA.DROP_ALL_APPSを使用したKafkaビューおよびメタデータの削除

Oracle SQL Access to Kafkaロード・アプリケーションが不要になった場合は、DBMS_KAFKA.DROP_STREAMING_APPをコールしてビューおよびメタデータを削除できます。次の例では、KafkaクラスタはExampleCluster、アプリケーションはExampleAppです。

EXEC SYS.DBMS_KAFKA.DROP_STREAMING_APP
      (‘ExampleCluster’, ‘ExampleApp’);

Oracle SQL Access to Kafkaアプリケーションへの1つ以上のOracle SQLアクセスが存在しなくなった場合は、DBMS_KAFKA.DROP_ALL_APPSをコールして、特定のクラスタのすべてのアプリケーションを削除できます

EXEC SYS.DBMS_KAFKA.DROP_ALL_APPS
      (‘ExampleCluster’);

21.17.3 Oracle SQL Access to Kafkaを使用したシーク可能アプリケーションの作成

過去に発生した問題を調査し、開始タイムスタンプと終了タイムスタンプの間のKafkaトピックにランダムにアクセスする場合は、シーク可能モードのDBMS_KAFKAを使用します。

シーク可能モードでKafkaトピックにアクセスする前に、DBMS_KAFKA.CREATE_SEEKABLE_APPを使用してOracle SQL Access to Kafkaアプリケーションを作成する必要があります。このパッケージは、シーク可能モードで使用できるアプリケーションを作成します。

Oracle SQL Access to Kafkaをシーク可能モードで使用すると、Kafkaデータを使用して、過去に発生した問題を調査できます。データがまだKafkaストリーム内に存在する場合は、DBMS_KAFKA.CREATE_SEEKABLE_APPをコールしてシーク可能アプリケーションを作成できます。シーク可能モード・アプリケーションを作成したら、プロシージャDBMS_KAFKA.SEEK_OFFSET_TSをコールして、データ・レコードの範囲を取得するようOracle SQL Access to Kafkaビューにリクエストできます。たとえば、生産の問題が午前3時頃に発生し、原因を調査する必要があることをITコンサルタントに知らせたとします。コンサルタントは次のプロシージャを使用して、一時表をロードしてから、その時間の1時間分に相当するデータを取得することを選択できます:

アプリケーションを作成する際には、作成する1つの一意のビューおよび一時表のペアの数が1からNの間であることに注意してください。ここで、NはKafkaトピック内のパーティションの数です。

実行時に、各アプリケーション・インスタンスは独自のユーザー・セッションで実行され、1つの一意のグローバル一時表およびその関連ビューを処理します。したがって、アプリケーション・インスタンスを同時に実行するには、少なくともKafkaトピック内のパーティションと同じ数(つまり、Nの値)のセッションをユーザーに割り当てる必要があります。view_countがユーザー当たりの最大セッション数を超えると、このコールは失敗し、ユーザーに割り当てられているセッションが不足していることを示すエラーが表示されます。特定のOracle SQL Access to Kafkaビューおよびその関連グローバル一時表にバインドされたKafkaパーティションの数は、作成されたビューの数および存在するパーティションの数によって異なります。Oracle SQL Access to Kafkaは、各ビューに割り当てられるパーティションの数を均等に分散します。

例21-22 DBMS_KAFKA.CREATE_SEEKABLE_APPを使用したKafkaデータの日付範囲の検索

この例では、生産の問題が午前3時頃に発生し、原因を調査する必要があることをITコンサルタントに知らせたとします。コンサルタントは、コンサルタントは次のプロシージャを使用して、一時表をロードしてから、その時間の1時間分に相当するデータを取得することを選択できます。ここで、KafkaクラスタはEXAMPLECLUSTER、列はEventColおよびExceptionColです:

SYS.DBMS_KAFKA.SEEK_OFFSET_TS (
                         ‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
                         TO_DATE (‘2022/07/04 02:30:00’, ‘YYYY/MM/DD HH:MI:SS’,
                         TO_DATE (‘2022/07/04 03:30:00’, ‘YYYY/MM/DD HH:MI:SS’));
SYS.DBMS_KAFKA.LOAD_TEMP_TABLE 
                   (ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0);
SELECT EventCol, ExceptionCol FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;

例21-23 DBMS_KAFKA.CREATE_SEEKABLE_APPを使用した異常に関連するレコードの検索

Kafkaストリームへの順次アクセスを使用するアプリケーションが潜在的な異常を検出し、異常表に行を挿入するとします。異常表には、Kafkaタイムスタンプと、トレースに重要として指定されたその他のデータが含まれます。別のアプリケーションは、この情報を使用して、疑われるレコード周辺のレコードを取得し、この異常に関連する他の問題があるかどうかを確認できます。この例では、ITコンサルタントが調べる異常に関連する列は、UserColおよびReqeustColです。これを実現するには、次のプロシージャを実行して一時表をロードし、アプリケーション・ロジックを選択して結果に適用します:

SYS.DBMS_KAFKA.SEEK_OFFSET_TS (
                        ‘ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0’,
                        TO_DATE (‘2020/07/04 02:30:00’, ‘YYYY/MM/DD HH:MI:SS’,
                        TO_DATE (‘2020/07/04 03:30:00’, ‘YYYY/MM/DD HH:MI:SS’));

SYS.DBMS_KAFKA.LOAD_TEMP_TABLE 
                     (ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0);
SELECT UserCol, RequestCol FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;
--application logic

例21-24 DBMS_KAFKA.DROP_SEEKABLE_APPまたはDBMS_KAFKA.DROP_ALL_APPSを使用したKafkaビューおよびメタデータの削除

Oracle SQL Access to Kafkaロード・アプリケーションが不要になった場合は、DBMS_KAFKA.DROP_SEEKABLE_APPをコールしてビューおよびメタデータを削除できます。次の例では、KafkaクラスタはExampleCluster、アプリケーションはExampleAppです。

EXEC SYS.DBMS_KAFKA.DROP_SEEKABLE_APP
      (‘ExampleCluster’, ‘ExampleApp’);

Oracle SQL Access to Kafkaアプリケーションへの1つ以上のOracle SQLアクセスが存在しなくなった場合は、DBMS_KAFKA.DROP_ALL_APPSをコールして、特定のクラスタのすべてのアプリケーションを削除できます

EXEC SYS.DBMS_KAFKA.DROP_ALL_APPS
      (‘ExampleCluster’);

21.18 アプリケーションでのKafkaクラスタ・アクセスの使用

アプリケーションでのKafkaクラスタ・データ・アクセスの使用方法を学習します。

21.18.1 Oracle SQL Access to Kafkaの問題を診断する方法

Oracle SQL Access to Kafkaで問題が発生した場合は、次のガイドラインを使用して原因を特定し、問題を解決します。

Oracle SQL Access to Kafkaの主な診断問題を次に示します:

初期接続の確立の失敗

このタイプのエラーは次のとおりです:

  • 不正な起動サーバー・リスト
  • 不正な資格証明情報
  • ネットワーキング構成の問題

初回アクセス時の失敗

DBMS_KAFKA CREATE_LOAD_APPCREATE_STREAMING_APPまたはCREATE_SEEKABLE_APPをコールしたときの初回アクセスの失敗には通常、次の原因があります:
  • 欠落しているか不正なトピック
  • 接続の問題

レコード更新中の失敗

通常、このタイプの失敗には次の原因があります:
  • 接続の問題
  • 内部メタデータまたはロジックの問題
  • レコードの欠落
  • Oracle SQL Access to Kafkaビューの形状が入力と一致しない解析エラー。

OracleアプリケーションおよびOracle SQL Access to KafkaがKafkaデータ入力に対応する際の失敗。

このタイプの失敗にはリソースのチューニングが必要です。これらは、Kafkaクラスタ内のトピックへの行の取込み率が、Oracle DatabaseでKafkaレコードを消費するOracle Databaseの能力に近いかそれを超えている場合に発生します。たとえば、一定時間後、Kafkaの未読レコードがOracle Databaseによって消費される前に期限が来たためにKafkaによって無効にされます。

この種のエラーは、ワークロードを決定することで回避または修正します。たとえば、問合せの頻度、Oracle SQL Access to KafkaビューへのOracle SQLアクセスごとの問合せ当たりに処理された典型的なレコード数、使用されている並列度、および分析を実行するアプリケーションで費やされた時間を確認します。ワークロードを決定したら、アプリケーション・スタックがワークロードを満たせることを確認します。アプリケーションおよびOracle DatabaseがアプリケーションまたはOracle Databaseリソースを制限することなく、ピークのKafkaレコードを処理できるように、リソースのサイズを設定します。

スループット・レートが上昇し始めたことがわかると、いくつかのことが役立ちます。たとえば、アプリケーション・ユーザーの並列度を増やしたり、より多くのアプリケーション・インスタンスを起動したり、Kafkaクラスタにパーティションを追加します。

例21-25 Oracle SQL Access to Kafka (OSAK)アプリケーション・エラーの解決

OSAKアプリケーションEXAMPLEAPPがKafkaクラスタEXAMPLECLUSTERからデータをロードしており、次のようなエラーが表示されるとします:

ORA-62721: The specified parallel hint [%0!s] exceeds the granule count {%1!s}.

このエラーの原因は、指定された値が、グラニュル数によって決定される可能な最大並列度を超えていたことです。このようなエラーはどのように解決しますか。

LOAD_TEMP_TABLEおよびEXECUTE_LOAD_APPparallel_hintパラメータは、特定のSELECT文でデータをフェッチするために実行できるパラレル・プロセスの数を決定する並列度(またはDOP)に関連しています。パラレル問合せを最大限に活用するには、parallel_hintパラメータを2から最大許容DOPの間に設定する必要があります。最大DOPは、コールを行うユーザーに許可される最大値、またはOSAKビューに関連付けられたパーティション数のいずれか小さい方です。この原因は、アプリケーションを実行しているデータベースまたはユーザー・アカウントのいずれかが最大許容DOPを超えていることです。

この問題を解決するには、グラニュル数以下の値を指定します。グラニュル数は、DBMS_KAFKA.GET_GRANULE_COUNT関数をコールすることで確認できます:

DECLARE
  v_dop INTEGER;
BEGIN
   LOOP 
       v_dop := SYS.DBMS_KAFKA.GET_GRANULE_COUNT(‘ORA$DKVGTT_EXAMPLECLUSTER_EXAMPLEAPP_0’);
       SYS.DBMS_KAFKA.LOAD_TEMP_TABLE(‘ORA$DKVGTT_EXAMPLECLUSTER_EXAMPLEAPP_0’);
       FOR kafka_record IN (
             SELECT kafka_offset offset 
                    FROM ORA$DKVGTT_EXAMPLECLUSTER_EXAMPLEAPP_0)
       LOOP
              SYS.DBMS_OUTPUT.PUT_LINE (‘Processing record: ‘ || kafka_record.offset);
              --application logic to process the Kafka records
       END LOOP;
       IF (application logic was successful) THEN
            --Update internal metadata to confirm Kafka records were successfully processed
            SYS.DBMS_KAFKA.UPDATE_OFFSET(‘ORA$DKV_EXAMPLECLUSTER_EXAMPLEAPP_0’);
            COMMIT;
       ELSE
            --add your application logic to correct for any failures
       END IF;
   END LOOP;
END;

21.18.2 Oracle SQL Access to Kafkaの問題の識別および解決

問題の識別および解決を支援するために、Oracle SQL Access to Kafkaには、クラスタ表にトレース・ファイル、メッセージ表、操作結果表および状態列があります。

表示される問題の性質を確認し、使用可能なユーティリティを使用して問題を特定して対処します:

  • 接続の問題、ロジックの問題またはKafkaアクセス・レイヤー(Kafkaデータ選択によってコールされるOracle実行可能ファイル): トレース・ファイルを確認します。また、sys.user_kafka_clusters表の状態列を確認できます。
  • DBMS_KAFKAおよびDBMS_KAFKA_ADM APIからの例外: sys.user_kafka_messages表のエラー・メッセージを確認します。
  • 操作ランタイムの問題: Oracle SQL Access to Kafkaデータ取得のパフォーマンスが予想どおりでない場合、sys.user_kafka_ops_results表のメッセージを確認します。

例21-26 接続の問題、ロジックの問題またはKafkaアクセス・レイヤーの問題

トレース・ファイルを使用して問題を識別します。

  • 接続関連の問題の場合、詳細はビュー・オブジェクト・トレースから入手できます。有効にするには、イベントをinit.oraファイルに追加するか、alter systemコマンドを使用して実行時にシステムを更新します。

    次のエントリを初期化ファイル(init.ora)に追加します。

    event='trace[KGRK] disk highest'

    システムを変更します:

     alter system set events 'trace[KGRK] disk highest';

    ノート:

    init.oraファイルの更新を有効にするには、データベースの再起動が必要です。
  • ロジック関連のエラーの場合、すべてのエラー・パスにトレースが含まれます。すべてのメッセージの先頭には文字列kubsCRKが付きます。これらのロジック・エラーによっても、SQL例外が発生します。

  • Oracle SQL Access to KafkaアプリケーションのKafkaアクセス・レイヤーのトレース出力は、TRUEとして渡されたenable引数を使用してDBMS_KAFKA.SET_TRACINGをコールすることで有効になります。トレース出力は、FALSEとして渡されたenable引数を使用して同じ関数をコールすることによって無効になります。

    たとえば:

    アプリケーションがExampleAppである状態で、ExampleClusterという名前のクラスタのトレースを有効にするには、次のように入力します:

    DBMS_KAFKA.SET_TRACING('ExampleCluster', 'ExampleApp', true)

    そのクラスタのトレースを無効にするには、次のように入力します:

    DBMS_KAFKA.SET_TRACING('ExampleCluster', 'ExampleApp', false)

    ノート:

    トレースを有効にするには、データベースに対して次のイベントがすでに有効になっている必要があります:

    event="39431 trace name context forever, level 1" # Enable external table debug tracing  

問題が接続の問題であると判断した場合は、sys.user_kafka_clusters表の状態列を確認します。接続レベルは数値で指定されます:

  • CONNECTED (0): この状態は、Kafkaクラスタへの接続が確立されていることを示します。接続の確立中に発生したエラーは、Kafkaデータのリクエストに関する問題を示します。問題を識別するには、DBMS_KAFKA.SET_TRACING APIを使用してトレースを有効にし、問題を再現してから、関連するトレース・ファイルでセッションに「kubsCRK」が含まれるメッセージがないか確認します。user_kafka_messages表のメッセージも確認します。
  • MAINTENANCE (1): この状態は、Kafkaクラスタへの接続が確立されていることを示しますが、接続の確立中に発生するエラーは、Kafkaデータのリクエストに関する問題を示します。この問題を解決するには、DBMS_KAFKA.SET_TRACING APIを使用してトレースを有効にし、問題を再現してから、関連するトレース・ファイルでセッションにkubsCRKが含まれるメッセージがないか確認します。user_kafka_messages表のメッセージも確認します。
  • BROKEN (2): この状態は、Kafkaクラスタへの接続を再確立できないことを示します。トレース・ファイルの機能KUBDのエラーおよびメッセージ表のエラーを探します。
  • DEREGISTERED (3): この状態は、OSAK管理者がクラスタの登録解除を強制し、関連するOracle SQL Access to Kafkaビューにアクセスする必要がないことを示します。これは予期された動作であり、エラーではありません。

例21-27 PL/SQLパッケージの問題

Sys.user_kafka_messages tableを確認します。この表には、過去3日間に記録されたメッセージが含まれます。古いデータは1日に1回自動的にパージされます。データに関連付けられているOSAKビューが削除された場合も、メッセージは削除されます。

例21-28 操作ランタイムの問題

SELECT文を使用して取得された行数が予想より少ないと思われる場合は、sys.user_kafka_ops_results表のデータを使用して、最後の選択時にKafkaから読み取られたレコード数を確認します。

SELECTには正しく解析された行のみが含まれるため、取得された行と読み取られたKafkaレコードの差は、Kafkaトピック内のすべてのデータがDBMS_KAFKA CREATE_LOAD_APPCREATE_STREAMING_APPまたはCREATE_SEEKABLE_APPコール中に指定された形式ではないことを示しています。

Kafkaトピック・データが指定された形式でない場合、回答は次のとおりです:

  1. Kafkaクラスタに公開しているプロデューサを修正します:
  2. アプリケーションを削除して再作成し、適切な形式(DSVの参照表、AVROのAVROスキーマ)を提供するようにします。
  3. JSONデータの場合は、アプリケーションを削除して再作成する前に、データがVARCHAR2 VALUE列の最大列長を超えているかどうかを確認します。データが最大値を超える場合は、アプリケーションを削除して再作成できますが、今回はオプション・パラメータにオプション"jsond" : "clob"を追加します。このオプションを使用すると、OSAKは、列をデフォルトの最大サイズVARCHAR2ではなく、キャラクタ・ラージ・オブジェクト(CLOB)列として作成できます。