103 DBMS_KAFKA_ADM
DBMS_KAFKA_ADM
パッケージは、後でアプリケーションのKafkaクラスタ・データへのアクセス権を付与するために使用できるクラスタ定義を作成するためのPL/SQLインタフェースを提供します。
OSAK_ADMIN_ROLE
を付与された管理者は、DBMS_KAFKA_ADM
パッケージを使用して、Oracle Databaseのビューおよび表からKafkaデータを問い合せるアプリケーションを作成できます。
103.1 DBMS_KAFKA_ADMの概要
DBMS_KAFKA_ADM
パッケージを使用すると、KafkaクラスタへのOracle SQLアクセスを管理できます。
Kafkaトピックへのアクセスを設定するには、OSAK_ADMIN_ROLE
で付与される権限が必要です。これらの権限を使用して、Kafkaクラスタに接続するときにリソースを管理するために役立つクラスタ定義を指定します。ユーザーがクラスタ定義を共有できるため、基礎となるフレームワークでクラスタへの接続を共有できます。接続を共有すると、メモリーおよびネットワーク・リソースを節約できます。
103.2 DBMS_KAFKA_ADMのセキュリティ・モデル
Oracle SQL Access to Kafkaの管理者ユーザーにターゲットOracle DatabaseのOSAK_ADMIN_ROLE
を付与することをお薦めします。
OSAK_ADMIN_ROLE
を使用すると、Oracle SQL access to Kafkaアプリケーションを作成および管理するために必要なシステム権限を付与するプロセスが簡略化されます。また、OSAK_ADMIN_ROLE
を持つ管理者は、アクセス権を必要とする関連するKafkaビューおよび表があるOracle Databaseオブジェクトへのアクセス権を他のユーザーに付与します。
OSAK_ADMIN_ROLE
に加えて、Oracle SQL access to Kafkaアプリケーションの管理者は次のシステム権限を持っている必要があります。
CREATE SESSION
ALTER SESSION
CREATE CREDENTIAL
(Kafka SASL-SSL (Simple Authentication and Security Layer)パスワードまたはOSS (Oracle Streaming Service) authTokenを作成するため)CREATE ANY DIRECTORY
(クラスタ・アクセスおよびクラスタ構成ディレクトリを作成するため)DROP ANY DIRECTORY
(クラスタ・アクセスおよびクラスタ構成ディレクトリを削除するため)sys.dbms_kafka_clusters
に対するREAD ON
権限sys.dbms_kafka_applications
に対するREAD ON
権限sys.dbms_kafka_messages
に対するREAD ON
権限sys.dbms_kafka_adm
に対するEXECUTE ON
権限
103.3 DBMS_KAFKA_ADMの定数
DBMS_KAFKA_ADM
パッケージはステートフル・パッケージであり、ここで説明する定数をパッケージ仕様部で宣言されたパッケージ状態の一部として使用します。
DBMS_KAFKA_ADM
の定数には、リテラル初期値を持つ定数とパッケージ・ビルドで指定する定数が含まれます。
Kafkaプロバイダ
Kafkaサーバーのプロバイダは、ApacheまたはOracle Cloud Infrastructure (OCI) Oracle Streaming Service (OSS)のいずれかです。KAFKA_PROVIDER_APACHE CONSTANT VARCHAR2(6) := 'APACHE';
KAFKA_PROVIDER_OSS CONSTANT VARCHAR2(3) := 'OSS';
クラスタ定義の状態
STATE_CONNECTED CONSTANT INTEGER := 0;
STATE_MAINTENANCE CONSTANT INTEGER := 1;
STATE_BROKEN CONSTANT INTEGER := 2;
STATE_DEREGISTERED CONSTANT INTEGER := 3;
Kafka接続モード
Kafka接続モードでは、次のいずれかの接続モード定数を指定します。
-
高スループット
接続モード(
connmode
)がhigh_throughput
の場合、この接続に関連付けられているアプリケーションでは、すべてのデータを可能なかぎり高速に配信する必要があります。この接続モードは、STREAMINGおよびLOADINGアプリケーションから使用する場合に最適です。例:
CONNECTION_MODE_HI_THRU CONSTANT VARCHAR2(15) := ‘high_throughput’;
-
低レイテンシ
接続モード(
connmode
)がlow_latency
の場合、基礎となるレイヤーは、可能なかぎり高速に最初の行を戻そうとします。この接続モードは、それぞれのロードで少量のKafkaデータのみを使用するアプリケーションに最も有効です。例:
CONNECTION_MODE_LO_LAT CONSTANT VARCHAR2(11) := ‘low_latency’;
-
オプション
OPT_CONNECTION_MODE CONSTANT VARCHAR2(30) := ‘connmode’;
103.4 DBMS_KAFKA_ADMのプロシージャの要約
DBMS_KAFKA_ADM
のプロシージャを使用すると、KafkaへのOracle SQLアクセスを作成、削除および構成できます。
Kafkaプロバイダ
Kafkaプロバイダ定数を使用して、Kafkaクラスタを定義します。Apache KafkaベースのクラスタとOracle Streaming Serviceベースのクラスタではセキュリティ・モデルが異なるため、正しいセキュア接続パラメータを渡すことができるように、接続するKafka環境のタイプを定義する必要があります。例:
KAFKA_PROVIDER_APACHE CONSTANT VARCHAR2(6) := 'APACHE';
KAFKA_PROVIDER_OSS CONSTANT VARCHAR2(3) := 'OSS';
クラスタ定義の状態
STATE_CONNECTED CONSTANT INTEGER := 0;
STATE_MAINTENANCE CONSTANT INTEGER := 1;
STATE_BROKEN CONSTANT INTEGER := 2;
STATE_DEREGISTERED CONSTANT INTEGER := 3;
Kafka接続モード
Kafka接続モードでは、レコードのバッチを読み取るためにOracle SQL access to Kafkaが従うKafkaルールを指定します。アプリケーションの要件に応じて、リソースをより効率的に使用できるのはどちらのオプションであるかに基づいて、低レイテンシ・ルールまたは高スループット・ルールのいずれかを指定できます。接続を作成するときには、スループットとレイテンシの間のトレードオフを考慮する必要があります。レコードが生成された時間と、そのデータをアプリケーションで使用できるようにする時間の間のレイテンシによって、低レイテンシを優先するか、高スループットを優先するかを決定する必要があります。
アプリケーションがほぼリアルタイムのデータ・アクセスを必要とするため、Kafkaクラスタによってデータが生成されてから数秒または1分以内にアプリケーションがデータに反応できるようにする場合は、低レイテンシ接続を選択する必要があります。低レイテンシ接続では、Oracle SQL Kafkaの接続バッファがより小さいサイズに設定され、Kafkaトピックへの読取りが頻繁に行われるため、レコードがKafkaトピックに対して生成されてから、Oracle SQL access to Kafkaを使用してアプリケーションによって使用されるまでの遅延は、アプリケーションのパフォーマンス要件内です。Kafkaトピックからのそれぞれの読取りは、比較的少数のレコードで構成されると予想され、Kafkaクラスタからの読取りの回数は多くなります。この形式のKafka接続は、ほぼリアルタイムの同時トランザクションでアプリケーションを更新する場合、つまりアプリケーションが変更を監視してすばやく反応する必要がある場合に特に適しています。ただし、各トランザクションで受け取るデータの量に対してトランザクションの数が比較的多いため、この構成はリソースを大量に消費する可能性があります。
アプリケーションにほぼリアルタイムの要件がないため、レコードの大きなバッチを処理することを優先して更新を遅らせ、15分または1時間という頻度で、あるいは8時間ごとに更新を行うことができるようにする場合は、高スループット接続の方が効率的な選択肢となります。高スループットに構成された接続では、Kafkaデータの各バッチがストリームされる頻度は低くなりますが、各トランザクションでストリームされるデータの量は多くなります。したがって、Oracle SQL access to Kafkaのバッファ・サイズが大きくなり、こうした大きなバッチに対応するようにネットワークI/Oおよびメモリーの割当てが構成されます。これらのトランザクションがストリームされる頻度は低くなるため、Oracle SQL access to Kafkaトランザクションによって消費されるサーバー・リソースの総量は少なくなります
-
低レイテンシ
CONNECTION_MODE_LO_LAT CONSTANT VARCHAR2(11) := ‘low_latency’;
-
高スループット
CONNECTION_MODE_HI_THRU CONSTANT VARCHAR2(15) := ‘high_throughput’;
-
オプション
OPT_CONNECTION_MODE CONSTANT VARCHAR2(30) := ‘connmode’;
次の表では、DBMS_KAFKA_ADMのプロシージャを示し、それらについて簡単に説明します。
表103-1 DBMS_KAFKA_ADMパッケージのプロシージャ
サブプログラム | 説明 |
---|---|
Kafkaクラスタの接続をテストします。 |
|
Kafkaクラスタを登録解除します。 |
|
Kafkaクラスタを無効にします。 |
|
Kafkaクラスタを有効にします。 |
|
Kafkaクラスタを登録します。 |
|
ホスト・サーバーまたはKafkaクラスタ・オプション(あるいはその両方)を更新します。 |
103.5 CHECK_CLUSTER
DBMS_KAFKA_ADM
のプロシージャCHECK_CLUSTER
は、Kafkaクラスタの接続をテストします。このプロシージャを使用して、構成されたセキュリティ情報で接続を確立できることを確認します。このファンクションは、クラスタの状態を戻します。
構文
FUNCTION CHECK_CLUSTER(
cluster_name IN VARCHAR2
) RETURN INTEGER;
パラメータ
表103-2 DBMS_KAFKA_ADMのCHECK_CLUSTERプロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
既存のKafkaクラスタの名前 大/小文字は区別されません。 |
使用上のノート
Oracle SQL access to Kafkaクラスタの管理者として、クラスタの接続をチェックする必要があります。接続をチェックすることで、Kafkaクラスタについて指定したブートストラップ・サーバー・リストおよびセキュリティ関連の構成が適切であり、接続を正常に確立できることが確認されます。REGISTER_CLUSTER
を実行し、セキュリティ構成が確立されたときに、このファンクションを使用して接続をテストできます。クラスタ接続が検証されたら、クラスタ定義をユーザー・アプリケーションで使用できるようにすることができます。
また、DBMS_KAFKA.LOAD_TEMP_TABLE
およびDBMS_KAFKA.EXECUTE_LOAD_APP
(どちらもOracle SQL access to Kafkaビューから選択する)をコールしたときにユーザーが問題を報告した場合にも、このファンクションを使用します。
例
クラスタExampleClusterの初期クラスタ定義を完了し、クラスタを登録したとします。次に、このプロシージャを使用して構成をチェックします。
EXEC SYS.DBMS_OUTPUT.PUT_LINE (
SYS.DBMS_KAFKA_ADM.CHECK_CLUSTER (‘ExampleCluster’));
103.6 DEREGISTER_CLUSTER
DBMS_KAFKA_ADM
のプロシージャDEREGISTER_CLUSTER
は、Kafkaクラスタを登録解除します。
ノート:
このプロシージャがクラスタを登録解除して削除するのは、クラスタに関連付けられているユーザー・アプリケーションがない場合のみです構文
PROCEDURE DEREGISTER_CLUSTER (
cluster_name IN VARCHAR2,
forced IN BOOLEAN DEFAULT FALSE
);
パラメータ
表103-3 DBMS_KAFKA_ADMのDEREGISTER_CLUSTERプロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
既存のKafkaクラスタの名前 大/小文字は区別されません。 |
forced |
(オプション)ユーザー・アプリケーションがまだ定義されている場合でも、クラスタを登録解除済としてマークします。デフォルト: 値: |
使用上のノート
クラスタ定義が不要になった場合、OSAK管理者はクラスタ定義を削除できます。
forced
パラメータがTRUE
として渡された場合、クラスタの状態は登録解除済としてマークされます。このクラスタにまだ関連付けられているアプリケーションは機能しなくなります。最後のアプリケーションが削除されると、登録解除されたこのクラスタ定義も削除されます。
例
次の例では、KafkaクラスタExampleClusterの未使用のクラスタ定義が登録解除されます。
EXEC SYS.DBMS_KAFKA_ADM.DEREGISTER_CLUSTER (‘ExampleCluster’);
クラスタExampleClusterに関連付けられているユーザー・アプリケーション(ビューおよび表)がまだあるが、クラスタを登録解除するとします。次の例では、forcedパラメータをTRUEに設定して、クラスタExampleClusterが登録解除されます。
exec DBMS_KAFKA_ADM.DEREGISTER_CLUSTER(
cluster_name => 'ExampleCluster',
forced => TRUE);
103.7 DISABLE_CLUSTER
DBMS_KAFKA_ADM
のプロシージャDISABLE_CLUSTER
は、Kafkaクラスタを無効にします。
ノート:
このプロシージャが正常に完了すると、アプリケーションの状態がMAINTENANCE
に変更されます。
構文
PROCEDURE DISABLE_CLUSTER(
cluster_name IN VARCHAR2
);
パラメータ
表103-4 DBMS_KAFKA_ADMのDISABLE_CLUSTERプロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
既存のKafkaクラスタの名前 大/小文字は区別されません。 |
使用上のノート
無効になったクラスタでは、クラスタのセキュリティ情報を取得できないため、Kafka接続を作成したり、Kafka接続にアクセスすることができなくなります。
Kafkaクラスタのメンテナンスを開始する前に、Oracle SQL access to Kafka管理者が、DBMS_KAFKA_ADM.DISABLE_CLUSTER
プロシージャを使用してクラスタへのアクセスを無効にする必要があります。メンテナンスが完了したら、DBMS_KAFKA_ADM.ENABLE_CLUSTER
プロシージャを使用してクラスタ・アクセスを再度有効にできます。
状態をMAINTENANCE
に変更できない場合、このプロシージャは例外を生成します
例
次の例では、ExampleClusterのクラスタ定義が一時的に無効になります。この状態では、ビューがKafkaクラスタに接続したり、Kafkaクラスタからデータを取得することはできません:
EXEC SYS.DBMS_KAFKA_ADM.DISABLE_CLUSTER (‘ExampleCluster’);
103.8 ENABLE_CLUSTER
DBMS_KAFKA_ADM
のプロシージャENABLE_CLUSTER
は、Kafkaクラスタを有効にします。
ノート:
このプロシージャが正常に完了すると、アプリケーションの状態がCONNECTED
に変更されます。
構文
PROCEDURE ENABLE_CLUSTER(
cluster_name IN VARCHAR2
);
パラメータ
表103-5 DBMS_KAFKA_ADMのENABLE_CLUSTERプロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
既存のKafkaクラスタの名前 大/小文字は区別されません。 |
使用上のノート
Kafkaクラスタのメンテナンスを完了したら、Oracle SQL access to Kafka管理者が、DBMS_KAFKA_ADM.ENABLE_CLUSTER
プロシージャを使用してクラスタへのアクセスを有効にする必要があります。
状態をCONNECTED
に変更できない場合、このプロシージャは例外を生成します。
例
メンテナンスを実行するためにクラスタExampleClusterを無効にしていたので、ビューが再びそのKafkaクラスタに接続してデータを取得できるようにするとします。次のコマンドを入力します。
EXEC SYS.DBMS_KAFKA_ADM.ENABLE_CLUSTER (‘ExampleCluster’);
103.9 REGISTER_CLUSTER
DBMS_KAFKA_ADM
のプロシージャREGISTER_CLUSTER
は、Kafkaクラスタを登録し、Oracle Databaseでアクセスできるようにします。
構文
FUNCTION REGISTER_CLUSTER (
cluster_name IN VARCHAR2,
startup_servers IN VARCHAR2,
kafka_provider_provider IN VARCHAR2,
cluster_access_dir IN VARCHAR2,
credential_name IN VARCHAR2 DEFAULT NULL,
cluster_config_dir IN VARCHAR2 DEFAULT NULL,
cluster_description IN VARCHAR2 DEFAULT NULL,
options IN CLOB DEFAULT NULL
) RETURN INTEGER;
パラメータ
表103-6 DBMS_KAFKA_ADMのREGISTER_CLUSTERプロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
既存のKafkaクラスタの名前 大/小文字は区別されません |
|
Kafkaクラスタの起動サーバー 大/小文字が区別されます |
|
Kafkaサーバーのプロバイダ(ApacheまたはOracle Cloud Infrastructure (OCI) Oracle Streaming Service (OSS)のいずれか)。構文:
大/小文字は区別されません
|
|
このクラスタへのアクセスを決定するためのOracleディレクトリ・オブジェクト 大/小文字は区別されません |
|
Kafkaに接続するためのパスワードに関連付けられた資格証明名。使用可能な名前の値および資格証明の作成方法については、 |
|
セキュアなクラスタに必要なクラスタ構成ファイルを含むOracleディレクトリ・オブジェクト。OSSまたはセキュアでないクラスタの場合、値は 大/小文字は区別されません |
|
(オプション)クラスタのテキスト説明。最大長: 400文字。 |
|
(オプション)クラスタ・オプション。書式はJSON形式のドキュメントです。使用可能なオプションについては、 |
使用上のノート
REGISTER_CLUSTER
は、クラスタ定義の格納後にCHECK_CLUSTER
コールを自動的に実行します。CHECK_CLUSTER
コールから戻されたステータスが、REGISTER_CLUSTER
からの戻り値となります。状態を判別できない場合、このプロシージャは例外を生成します。
Oracle SQL access to Kafka (OSAK)クラスタのライフサイクルの終了時に、管理者は、DBMS_KAFKA_ADM.DEREGISTER_CLUSTER
ファンクションを使用してOSAKクラスタを削除できます。
例
次の例では、REGISTER_CLUSTER
を使用したファンクション要求によって、Apacheをプロバイダとして使用するサーバーmykafkastartup
にあるKafkaクラスタKAFKACLUS1が登録されます。Oracle Databaseインスタンスのクラスタ・アクセス・ディレクトリはOSAK_KAFKACLUS1_ACCESS
であり、資格証明KAFKACLUS1CRED1
でアクセスされ、クラスタ構成ディレクトリOSAK_KAFKACLUS1_CONFIG
を使用します。このクラスタの説明(オプション)は、"My test cluster kafkaclus1"です。ユーザー(examplekafka-user
)には、 OSAK_KAFKACLUS1_ACCESS
にあるKafkaクラスタ・データへのREAD
アクセス権が付与されます。
SQL> select DBMS_KAFKA_ADM.REGISTER_CLUSTER (‘KAFKACLUS1’,
‘mykafkastartup-host:9092’,
DBMS_KAFKA_ADM.KAFKA_PROVIDER_APACHE,
‘OSAK_KAFKACLUS1_ACCESS’
‘KAFKACLUS1CRED1’,
‘OSAK_KAFKACLUS1_CONFIG’,
'My test cluster kafkaclus1’) from dual;
Output: Successful registration return 0 (zero)
SQL> DBMS_KAFKA_ADM_RE...
0
SQL> grant read on directory OSAK_KAFKACLUS1_ACCESS to examplekafka-user;
セキュアでないクラスタ・アクセスを作成することもできます。次の例では、Oracle SQL access to Kafka (OSAK)を使用して、セキュアでないKafkaクラスタ・アクセスが登録されます。
-
クラスタ・アクセス・データベース・ディレクトリは、空のパスで作成されます。このディレクトリは、KafkaクラスタにアクセスできるOracleユーザーを制御するために使用されます。
SQL> CREATE DIRECTORY OSAK_KAFKACLUS2_ACCESS AS '';
-
パス構造
Oracleベース/
osak/cluster_name/configを使用するクラスタ構成オペレーティング・システム・ディレクトリが、対応するOracleディレクトリ・オブジェクトとともに作成されます(Oracleベースは/u01/app/oracle
、Kafkaクラスタ名はkafkaclus2
です):$mkdir /u01/app/oracle/osak/kafkaclus2/config; . . . SQL> CREATE DIRECTORY OSAK_KAFKACLUS2_CONFIG AS 'u01/app/oracle/osak/kafkaclus2/config';
-
空の
osakafka.properties
ファイルを作成するか、企業での必要に応じて決定した、OSAKチューニングまたはデバッグ・プロパティを指定してosakafka.properties
ファイルを作成します。 -
DBMS_KAKFA_ADM.REGISTER_CLUSTER()
を使用してKafkaクラスタを登録します。次に例を示します。SQL> select DBMS_KAFKA_ADM.REGISTER_CLUSTER ( cluster_name => ‘KAFKACLUS2’, bootstrap_servers =>‘mykafkastartup-host:9092’, kafka_provider => DBMS_KAFKA_ADM.KAFKA_PROVIDER_APACHE, cluster_access_dir => ‘OSAK_KAFKACLUS2_ACCESS’, credential_name => NULL, cluster_config_dir => ‘OSAK_KAFKACLUS2_CONFIG’, cluster_description => 'My test cluster kafkaclus2’, options => NULL) from dual; Output: Successful registration return 0 (zero) SQL> DBMS_KAFKA_ADM_RE….. 0
-
ユーザーに読取りアクセス権を付与します。この例では、ユーザーexamplekafka-user2にアクセス権が付与されます。
SQL> grant read on directory osak_kafkaclus2_access to examplekafka-user2;
103.10 UPDATE_CLUSTER_INFO
DBMS_KAFKA_ADM
のプロシージャUPDATE_CLUSTER_INFO
は、Kafkaホスト・サーバー、またはKafkaクラスタのオプション(あるいはその両方)を更新します。
構文
PROCEDURE UPDATE_CLUSTER_INFO(
cluster_name IN VARCHAR2,
startup_servers IN VARCHAR2 DEFAULT NULL,
options IN CLOB DEFAULT NULL
);
パラメータ
表103-7 DBMS_KAFKA_ADMのUPDATE_CLUSTER_INFOプロシージャのパラメータ
パラメータ | 説明 |
---|---|
|
既存のKafkaクラスタの名前 大/小文字は区別されません |
|
Kafkaクラスタの起動サーバー 大/小文字が区別されます これはKafkaソース構成の一部であるため、この文字列は不透明値として扱われ(そのまま使用され)、Kafkaクラスタ接続ロジックに渡されます。 |
|
(オプション)クラスタ・オプション。書式はJSON形式のドキュメントです。使用可能なオプションについては、 |
使用上のノート
UPDATE_CLUSTER_INFO
プロシージャは、起動サーバー・リストまたはクラスタ・オプション(あるいはその両方)を含むKafkaクラスタ定義を更新します。また、Kafkaサーバーから切断します。Kafkaクラスタ環境が変更され、クラスタ定義および構成を変更する必要がある場合、Oracle SQL access to Kafka (OSAK)管理者として、このプロシージャを使用します。
更新に失敗すると、このプロシージャは例外を生成します。
例
次の例では、Kafka管理者が、サーバーnewhost
およびhost2
を使用するようにクラスタを再構成したので、KafkaクラスタExampleCluster
の起動サーバーのリストを更新します。
EXEC SYS.DBMS_KAFKA_ADM.UPDATE_CLUSTER_INFO
(‘ExampleCluster’,
‘newhost:9092,host2:9092’);