103 DBMS_KAFKA_ADM

DBMS_KAFKA_ADMパッケージは、後でアプリケーションのKafkaクラスタ・データへのアクセス権を付与するために使用できるクラスタ定義を作成するためのPL/SQLインタフェースを提供します。

OSAK_ADMIN_ROLEを付与された管理者は、DBMS_KAFKA_ADMパッケージを使用して、Oracle Databaseのビューおよび表からKafkaデータを問い合せるアプリケーションを作成できます。

103.1 DBMS_KAFKA_ADMの概要

DBMS_KAFKA_ADMパッケージを使用すると、KafkaクラスタへのOracle SQLアクセスを管理できます。

Kafkaトピックへのアクセスを設定するには、OSAK_ADMIN_ROLEで付与される権限が必要です。これらの権限を使用して、Kafkaクラスタに接続するときにリソースを管理するために役立つクラスタ定義を指定します。ユーザーがクラスタ定義を共有できるため、基礎となるフレームワークでクラスタへの接続を共有できます。接続を共有すると、メモリーおよびネットワーク・リソースを節約できます。

103.2 DBMS_KAFKA_ADMのセキュリティ・モデル

Oracle SQL Access to Kafkaの管理者ユーザーにターゲットOracle DatabaseのOSAK_ADMIN_ROLEを付与することをお薦めします。

OSAK_ADMIN_ROLEを使用すると、Oracle SQL access to Kafkaアプリケーションを作成および管理するために必要なシステム権限を付与するプロセスが簡略化されます。また、OSAK_ADMIN_ROLEを持つ管理者は、アクセス権を必要とする関連するKafkaビューおよび表があるOracle Databaseオブジェクトへのアクセス権を他のユーザーに付与します。

OSAK_ADMIN_ROLEに加えて、Oracle SQL access to Kafkaアプリケーションの管理者は次のシステム権限を持っている必要があります。

  • CREATE SESSION
  • ALTER SESSION
  • CREATE CREDENTIAL (Kafka SASL-SSL (Simple Authentication and Security Layer)パスワードまたはOSS (Oracle Streaming Service) authTokenを作成するため)
  • CREATE ANY DIRECTORY (クラスタ・アクセスおよびクラスタ構成ディレクトリを作成するため)
  • DROP ANY DIRECTORY (クラスタ・アクセスおよびクラスタ構成ディレクトリを削除するため)
  • sys.dbms_kafka_clustersに対するREAD ON権限
  • sys.dbms_kafka_applicationsに対するREAD ON権限
  • sys.dbms_kafka_messagesに対するREAD ON権限
  • sys.dbms_kafka_admに対するEXECUTE ON権限

103.3 DBMS_KAFKA_ADMの定数

DBMS_KAFKA_ADMパッケージはステートフル・パッケージであり、ここで説明する定数をパッケージ仕様部で宣言されたパッケージ状態の一部として使用します。

DBMS_KAFKA_ADMの定数には、リテラル初期値を持つ定数とパッケージ・ビルドで指定する定数が含まれます。

Kafkaプロバイダ

Kafkaサーバーのプロバイダは、ApacheまたはOracle Cloud Infrastructure (OCI) Oracle Streaming Service (OSS)のいずれかです。
KAFKA_PROVIDER_APACHE CONSTANT VARCHAR2(6) := 'APACHE';
KAFKA_PROVIDER_OSS CONSTANT VARCHAR2(3) := 'OSS';

クラスタ定義の状態

STATE_CONNECTED CONSTANT INTEGER := 0;
STATE_MAINTENANCE CONSTANT INTEGER := 1;
STATE_BROKEN CONSTANT INTEGER := 2;
STATE_DEREGISTERED CONSTANT INTEGER := 3;

Kafka接続モード

Kafka接続モードでは、次のいずれかの接続モード定数を指定します。

  • 高スループット

    接続モード(connmode)がhigh_throughputの場合、この接続に関連付けられているアプリケーションでは、すべてのデータを可能なかぎり高速に配信する必要があります。この接続モードは、STREAMINGおよびLOADINGアプリケーションから使用する場合に最適です。

    例:

    CONNECTION_MODE_HI_THRU CONSTANT VARCHAR2(15) := ‘high_throughput’;
  • 低レイテンシ

    接続モード(connmode)がlow_latencyの場合、基礎となるレイヤーは、可能なかぎり高速に最初の行を戻そうとします。この接続モードは、それぞれのロードで少量のKafkaデータのみを使用するアプリケーションに最も有効です。

    例:

    CONNECTION_MODE_LO_LAT CONSTANT VARCHAR2(11) := ‘low_latency’;
  • オプション

    OPT_CONNECTION_MODE CONSTANT VARCHAR2(30) := ‘connmode’;

103.4 DBMS_KAFKA_ADMのプロシージャの要約

DBMS_KAFKA_ADMのプロシージャを使用すると、KafkaへのOracle SQLアクセスを作成、削除および構成できます。

Kafkaプロバイダ

Kafkaプロバイダ定数を使用して、Kafkaクラスタを定義します。Apache KafkaベースのクラスタとOracle Streaming Serviceベースのクラスタではセキュリティ・モデルが異なるため、正しいセキュア接続パラメータを渡すことができるように、接続するKafka環境のタイプを定義する必要があります。

例:

KAFKA_PROVIDER_APACHE CONSTANT VARCHAR2(6) := 'APACHE';
KAFKA_PROVIDER_OSS CONSTANT VARCHAR2(3) := 'OSS';

クラスタ定義の状態

STATE_CONNECTED CONSTANT INTEGER := 0;
STATE_MAINTENANCE CONSTANT INTEGER := 1;
STATE_BROKEN CONSTANT INTEGER := 2;
STATE_DEREGISTERED CONSTANT INTEGER := 3;

Kafka接続モード

Kafka接続モードでは、レコードのバッチを読み取るためにOracle SQL access to Kafkaが従うKafkaルールを指定します。アプリケーションの要件に応じて、リソースをより効率的に使用できるのはどちらのオプションであるかに基づいて、低レイテンシ・ルールまたは高スループット・ルールのいずれかを指定できます。接続を作成するときには、スループットとレイテンシの間のトレードオフを考慮する必要があります。レコードが生成された時間と、そのデータをアプリケーションで使用できるようにする時間の間のレイテンシによって、低レイテンシを優先するか、高スループットを優先するかを決定する必要があります。

アプリケーションがほぼリアルタイムのデータ・アクセスを必要とするため、Kafkaクラスタによってデータが生成されてから数秒または1分以内にアプリケーションがデータに反応できるようにする場合は、低レイテンシ接続を選択する必要があります。低レイテンシ接続では、Oracle SQL Kafkaの接続バッファがより小さいサイズに設定され、Kafkaトピックへの読取りが頻繁に行われるため、レコードがKafkaトピックに対して生成されてから、Oracle SQL access to Kafkaを使用してアプリケーションによって使用されるまでの遅延は、アプリケーションのパフォーマンス要件内です。Kafkaトピックからのそれぞれの読取りは、比較的少数のレコードで構成されると予想され、Kafkaクラスタからの読取りの回数は多くなります。この形式のKafka接続は、ほぼリアルタイムの同時トランザクションでアプリケーションを更新する場合、つまりアプリケーションが変更を監視してすばやく反応する必要がある場合に特に適しています。ただし、各トランザクションで受け取るデータの量に対してトランザクションの数が比較的多いため、この構成はリソースを大量に消費する可能性があります。

アプリケーションにほぼリアルタイムの要件がないため、レコードの大きなバッチを処理することを優先して更新を遅らせ、15分または1時間という頻度で、あるいは8時間ごとに更新を行うことができるようにする場合は、高スループット接続の方が効率的な選択肢となります。高スループットに構成された接続では、Kafkaデータの各バッチがストリームされる頻度は低くなりますが、各トランザクションでストリームされるデータの量は多くなります。したがって、Oracle SQL access to Kafkaのバッファ・サイズが大きくなり、こうした大きなバッチに対応するようにネットワークI/Oおよびメモリーの割当てが構成されます。これらのトランザクションがストリームされる頻度は低くなるため、Oracle SQL access to Kafkaトランザクションによって消費されるサーバー・リソースの総量は少なくなります

  • 低レイテンシ

    CONNECTION_MODE_LO_LAT CONSTANT VARCHAR2(11) := ‘low_latency’;
  • 高スループット

    CONNECTION_MODE_HI_THRU CONSTANT VARCHAR2(15) := ‘high_throughput’;
  • オプション

    OPT_CONNECTION_MODE CONSTANT VARCHAR2(30) := ‘connmode’;

次の表では、DBMS_KAFKA_ADMのプロシージャを示し、それらについて簡単に説明します。

表103-1 DBMS_KAFKA_ADMパッケージのプロシージャ

サブプログラム 説明

CHECK_CLUSTER

Kafkaクラスタの接続をテストします。

DEREGISTER_CLUSTER

Kafkaクラスタを登録解除します。

DISABLE_CLUSTER

Kafkaクラスタを無効にします。

ENABLE_CLUSTER

Kafkaクラスタを有効にします。

REGISTER_CLUSTER

Kafkaクラスタを登録します。

UPDATE_CLUSTER_INFO

ホスト・サーバーまたはKafkaクラスタ・オプション(あるいはその両方)を更新します。

103.5 CHECK_CLUSTER

DBMS_KAFKA_ADMのプロシージャCHECK_CLUSTERは、Kafkaクラスタの接続をテストします。このプロシージャを使用して、構成されたセキュリティ情報で接続を確立できることを確認します。このファンクションは、クラスタの状態を戻します。

構文

FUNCTION CHECK_CLUSTER(
                   cluster_name   IN VARCHAR2
  ) RETURN INTEGER;

パラメータ

表103-2 DBMS_KAFKA_ADMのCHECK_CLUSTERプロシージャのパラメータ

パラメータ 説明

cluster_name

既存のKafkaクラスタの名前

大/小文字は区別されません。

使用上のノート

Oracle SQL access to Kafkaクラスタの管理者として、クラスタの接続をチェックする必要があります。接続をチェックすることで、Kafkaクラスタについて指定したブートストラップ・サーバー・リストおよびセキュリティ関連の構成が適切であり、接続を正常に確立できることが確認されます。REGISTER_CLUSTERを実行し、セキュリティ構成が確立されたときに、このファンクションを使用して接続をテストできます。クラスタ接続が検証されたら、クラスタ定義をユーザー・アプリケーションで使用できるようにすることができます。

また、DBMS_KAFKA.LOAD_TEMP_TABLEおよびDBMS_KAFKA.EXECUTE_LOAD_APP (どちらもOracle SQL access to Kafkaビューから選択する)をコールしたときにユーザーが問題を報告した場合にも、このファンクションを使用します。

クラスタExampleClusterの初期クラスタ定義を完了し、クラスタを登録したとします。次に、このプロシージャを使用して構成をチェックします。

EXEC SYS.DBMS_OUTPUT.PUT_LINE (
           SYS.DBMS_KAFKA_ADM.CHECK_CLUSTER (‘ExampleCluster’));

103.6 DEREGISTER_CLUSTER

DBMS_KAFKA_ADMのプロシージャDEREGISTER_CLUSTERは、Kafkaクラスタを登録解除します。

ノート:

このプロシージャがクラスタを登録解除して削除するのは、クラスタに関連付けられているユーザー・アプリケーションがない場合のみです

構文

PROCEDURE DEREGISTER_CLUSTER (
                      cluster_name        IN VARCHAR2,
                      forced              IN BOOLEAN DEFAULT FALSE
  );

パラメータ

表103-3 DBMS_KAFKA_ADMのDEREGISTER_CLUSTERプロシージャのパラメータ

パラメータ 説明

cluster_name

既存のKafkaクラスタの名前

大/小文字は区別されません。

forced

(オプション)ユーザー・アプリケーションがまだ定義されている場合でも、クラスタを登録解除済としてマークします。デフォルト: FALSE

値:[TRUE|FALSE]

使用上のノート

クラスタ定義が不要になった場合、OSAK管理者はクラスタ定義を削除できます。

forcedパラメータがTRUEとして渡された場合、クラスタの状態は登録解除済としてマークされます。このクラスタにまだ関連付けられているアプリケーションは機能しなくなります。最後のアプリケーションが削除されると、登録解除されたこのクラスタ定義も削除されます。

次の例では、KafkaクラスタExampleClusterの未使用のクラスタ定義が登録解除されます。

EXEC SYS.DBMS_KAFKA_ADM.DEREGISTER_CLUSTER (‘ExampleCluster’);

クラスタExampleClusterに関連付けられているユーザー・アプリケーション(ビューおよび表)がまだあるが、クラスタを登録解除するとします。次の例では、forcedパラメータをTRUEに設定して、クラスタExampleClusterが登録解除されます。

exec DBMS_KAFKA_ADM.DEREGISTER_CLUSTER(
    cluster_name => 'ExampleCluster',
    forced => TRUE);

103.7 DISABLE_CLUSTER

DBMS_KAFKA_ADMのプロシージャDISABLE_CLUSTERは、Kafkaクラスタを無効にします。

ノート:

このプロシージャが正常に完了すると、アプリケーションの状態がMAINTENANCEに変更されます。

構文

PROCEDURE DISABLE_CLUSTER(
		cluster_name		IN VARCHAR2
);

パラメータ

表103-4 DBMS_KAFKA_ADMのDISABLE_CLUSTERプロシージャのパラメータ

パラメータ 説明

cluster_name

既存のKafkaクラスタの名前

大/小文字は区別されません。

使用上のノート

無効になったクラスタでは、クラスタのセキュリティ情報を取得できないため、Kafka接続を作成したり、Kafka接続にアクセスすることができなくなります。

Kafkaクラスタのメンテナンスを開始する前に、Oracle SQL access to Kafka管理者が、DBMS_KAFKA_ADM.DISABLE_CLUSTERプロシージャを使用してクラスタへのアクセスを無効にする必要があります。メンテナンスが完了したら、DBMS_KAFKA_ADM.ENABLE_CLUSTERプロシージャを使用してクラスタ・アクセスを再度有効にできます。

状態をMAINTENANCEに変更できない場合、このプロシージャは例外を生成します

次の例では、ExampleClusterのクラスタ定義が一時的に無効になります。この状態では、ビューがKafkaクラスタに接続したり、Kafkaクラスタからデータを取得することはできません:

EXEC SYS.DBMS_KAFKA_ADM.DISABLE_CLUSTER (‘ExampleCluster’);

103.8 ENABLE_CLUSTER

DBMS_KAFKA_ADMのプロシージャENABLE_CLUSTERは、Kafkaクラスタを有効にします。

ノート:

このプロシージャが正常に完了すると、アプリケーションの状態がCONNECTEDに変更されます。

構文

PROCEDURE ENABLE_CLUSTER(
                           cluster_name    IN VARCHAR2
  );

パラメータ

表103-5 DBMS_KAFKA_ADMのENABLE_CLUSTERプロシージャのパラメータ

パラメータ 説明

cluster_name

既存のKafkaクラスタの名前

大/小文字は区別されません。

使用上のノート

Kafkaクラスタのメンテナンスを完了したら、Oracle SQL access to Kafka管理者が、DBMS_KAFKA_ADM.ENABLE_CLUSTERプロシージャを使用してクラスタへのアクセスを有効にする必要があります。

状態をCONNECTEDに変更できない場合、このプロシージャは例外を生成します。

メンテナンスを実行するためにクラスタExampleClusterを無効にしていたので、ビューが再びそのKafkaクラスタに接続してデータを取得できるようにするとします。次のコマンドを入力します。

EXEC SYS.DBMS_KAFKA_ADM.ENABLE_CLUSTER (‘ExampleCluster’);

103.9 REGISTER_CLUSTER

DBMS_KAFKA_ADMのプロシージャREGISTER_CLUSTERは、Kafkaクラスタを登録し、Oracle Databaseでアクセスできるようにします。

構文

FUNCTION REGISTER_CLUSTER (
                       cluster_name             IN VARCHAR2,
                       startup_servers          IN VARCHAR2,
                       kafka_provider_provider  IN VARCHAR2,
                       cluster_access_dir       IN VARCHAR2,
                       credential_name          IN VARCHAR2 DEFAULT NULL,
                       cluster_config_dir       IN VARCHAR2 DEFAULT NULL,
                       cluster_description      IN VARCHAR2 DEFAULT NULL,
                       options                  IN CLOB DEFAULT NULL
)  RETURN INTEGER;

パラメータ

表103-6 DBMS_KAFKA_ADMのREGISTER_CLUSTERプロシージャのパラメータ

パラメータ 説明

cluster_name

既存のKafkaクラスタの名前

大/小文字は区別されません

bootstrap_servers

Kafkaクラスタの起動サーバー

大/小文字が区別されます

kafka_provider_provider

Kafkaサーバーのプロバイダ(ApacheまたはOracle Cloud Infrastructure (OCI) Oracle Streaming Service (OSS)のいずれか)。構文: DBMS_KAFKA_ADM.KAFKA_PROVIDER_provider

providerの値: [APACHE|OSS]

大/小文字は区別されません

DBMS_KAFKA_ADMの定数KAFKA_PROVIDERも参照してください

cluster_access_dir

このクラスタへのアクセスを決定するためのOracleディレクトリ・オブジェクト

大/小文字は区別されません

credential_name

Kafkaに接続するためのパスワードに関連付けられた資格証明名。使用可能な名前の値および資格証明の作成方法については、SYS.DBMS_CREDENTIALを参照してください。Kafka接続にパスワードが必要な場合は指定する必要があります。それ以外の場合はNULLです

cluster_config_dir

セキュアなクラスタに必要なクラスタ構成ファイルを含むOracleディレクトリ・オブジェクト。OSSまたはセキュアでないクラスタの場合、値はNULLです。

大/小文字は区別されません

cluster_description

(オプション)クラスタのテキスト説明。最大長: 400文字。

options

(オプション)クラスタ・オプション。書式はJSON形式のドキュメントです。使用可能なオプションについては、SYS.DBMS_KAFKA_ADM.OPT_%を参照してください

使用上のノート

REGISTER_CLUSTERは、クラスタ定義の格納後にCHECK_CLUSTERコールを自動的に実行します。CHECK_CLUSTERコールから戻されたステータスが、REGISTER_CLUSTERからの戻り値となります。状態を判別できない場合、このプロシージャは例外を生成します。

Oracle SQL access to Kafka (OSAK)クラスタのライフサイクルの終了時に、管理者は、DBMS_KAFKA_ADM.DEREGISTER_CLUSTERファンクションを使用してOSAKクラスタを削除できます。

次の例では、REGISTER_CLUSTERを使用したファンクション要求によって、Apacheをプロバイダとして使用するサーバーmykafkastartupにあるKafkaクラスタKAFKACLUS1が登録されます。Oracle Databaseインスタンスのクラスタ・アクセス・ディレクトリはOSAK_KAFKACLUS1_ACCESSであり、資格証明KAFKACLUS1CRED1でアクセスされ、クラスタ構成ディレクトリOSAK_KAFKACLUS1_CONFIGを使用します。このクラスタの説明(オプション)は、"My test cluster kafkaclus1"です。ユーザー(examplekafka-user)には、 OSAK_KAFKACLUS1_ACCESSにあるKafkaクラスタ・データへのREADアクセス権が付与されます。

SQL> select DBMS_KAFKA_ADM.REGISTER_CLUSTER (‘KAFKACLUS1’,
                                   ‘mykafkastartup-host:9092’, 
                                   DBMS_KAFKA_ADM.KAFKA_PROVIDER_APACHE, 
                                   ‘OSAK_KAFKACLUS1_ACCESS’
                                   ‘KAFKACLUS1CRED1’, 
                                   ‘OSAK_KAFKACLUS1_CONFIG’,
                                   'My test cluster kafkaclus1’) from dual;

Output: Successful registration return 0 (zero)
SQL> DBMS_KAFKA_ADM_RE...
             0


        SQL> grant read on directory OSAK_KAFKACLUS1_ACCESS to examplekafka-user;

セキュアでないクラスタ・アクセスを作成することもできます。次の例では、Oracle SQL access to Kafka (OSAK)を使用して、セキュアでないKafkaクラスタ・アクセスが登録されます。

  1. クラスタ・アクセス・データベース・ディレクトリは、空のパスで作成されます。このディレクトリは、KafkaクラスタにアクセスできるOracleユーザーを制御するために使用されます。

    SQL> CREATE DIRECTORY 
    OSAK_KAFKACLUS2_ACCESS AS '';
  2. パス構造Oracleベース/osak/cluster_name/configを使用するクラスタ構成オペレーティング・システム・ディレクトリが、対応するOracleディレクトリ・オブジェクトとともに作成されます(Oracleベースは/u01/app/oracle、Kafkaクラスタ名はkafkaclus2です):

    $mkdir /u01/app/oracle/osak/kafkaclus2/config;
    .
    .
    .
    SQL> CREATE DIRECTORY OSAK_KAFKACLUS2_CONFIG AS 'u01/app/oracle/osak/kafkaclus2/config';
  3. 空のosakafka.propertiesファイルを作成するか、企業での必要に応じて決定した、OSAKチューニングまたはデバッグ・プロパティを指定してosakafka.propertiesファイルを作成します。

  4. DBMS_KAKFA_ADM.REGISTER_CLUSTER()を使用してKafkaクラスタを登録します。次に例を示します。

    SQL> select DBMS_KAFKA_ADM.REGISTER_CLUSTER (
              cluster_name  => ‘KAFKACLUS2’,
              bootstrap_servers =>‘mykafkastartup-host:9092’, 
              kafka_provider => DBMS_KAFKA_ADM.KAFKA_PROVIDER_APACHE, 
              cluster_access_dir => ‘OSAK_KAFKACLUS2_ACCESS’,
              credential_name => NULL,
              cluster_config_dir => ‘OSAK_KAFKACLUS2_CONFIG’,
               cluster_description =>  'My test cluster kafkaclus2’,
               options  => NULL) 
    from dual;
    Output: Successful registration return 0 (zero)
    SQL> DBMS_KAFKA_ADM_RE…..
                 0
    
  5. ユーザーに読取りアクセス権を付与します。この例では、ユーザーexamplekafka-user2にアクセス権が付与されます。

    SQL> grant read on directory 
    osak_kafkaclus2_access to examplekafka-user2;

103.10 UPDATE_CLUSTER_INFO

DBMS_KAFKA_ADMのプロシージャUPDATE_CLUSTER_INFOは、Kafkaホスト・サーバー、またはKafkaクラスタのオプション(あるいはその両方)を更新します。

構文

PROCEDURE UPDATE_CLUSTER_INFO(
                       cluster_name         IN VARCHAR2,
                       startup_servers    IN VARCHAR2 DEFAULT NULL,
                       options              IN CLOB DEFAULT NULL
 );

パラメータ

表103-7 DBMS_KAFKA_ADMのUPDATE_CLUSTER_INFOプロシージャのパラメータ

パラメータ 説明

cluster_name

既存のKafkaクラスタの名前

大/小文字は区別されません

startup_servers

Kafkaクラスタの起動サーバー

大/小文字が区別されます

これはKafkaソース構成の一部であるため、この文字列は不透明値として扱われ(そのまま使用され)、Kafkaクラスタ接続ロジックに渡されます。

options

(オプション)クラスタ・オプション。書式はJSON形式のドキュメントです。使用可能なオプションについては、SYS.DBMS_KAFKA_ADM.OPT_%を参照してください

使用上のノート

UPDATE_CLUSTER_INFOプロシージャは、起動サーバー・リストまたはクラスタ・オプション(あるいはその両方)を含むKafkaクラスタ定義を更新します。また、Kafkaサーバーから切断します。Kafkaクラスタ環境が変更され、クラスタ定義および構成を変更する必要がある場合、Oracle SQL access to Kafka (OSAK)管理者として、このプロシージャを使用します。

更新に失敗すると、このプロシージャは例外を生成します。

次の例では、Kafka管理者が、サーバーnewhostおよびhost2を使用するようにクラスタを再構成したので、KafkaクラスタExampleClusterの起動サーバーのリストを更新します。

EXEC SYS.DBMS_KAFKA_ADM.UPDATE_CLUSTER_INFO 
(‘ExampleCluster’,

‘newhost:9092,host2:9092’);