Oracle Streamsによって、キュー、伝播、エンキュー機能およびデキュー機能を使用したメッセージングが有効になります。キューには、ANYDATAキューまたは型付きキューを使用できます。ANYDATAキューでは、ペイロードの型がANYDATAのユーザー・メッセージがステージングされます。ANYDATAペイロードは、様々なデータ型のペイロードのラッパーとして使用できます。型付きキューでは、特定のタイプのメッセージをステージングできます。
次の各項では、Oracle Streamsメッセージ環境の構成について説明します。
この章で説明する各タスクは、特に明記されていないかぎり、適切な権限を付与されているOracle Streams管理者が完了する必要があります。この章の例では、各データベースのOracle Streams管理者が構成済であると想定されています。
|
関連項目:
|
ANYDATAペイロードのほとんどすべての型のペイロードはラップできます。次の各項では、ANYDATAキューに対するメッセージのエンキューおよびデキューの例を示します。
次に、ANYDATAペイロードの様々な型のペイロードをラップする手順を示します。
SQL*Plusで、ユーザーの作成、権限の付与、表領域の作成およびユーザーの変更を実行できる管理者ユーザーとして、dbs1.example.comデータベースに接続します。
SQL*Plusでデータベースに接続する方法については、Oracle Database管理者ガイドを参照してください。
DBMS_AQパッケージのEXECUTE権限をoeユーザーに付与し、このユーザーがこのパッケージのENQUEUEプロシージャおよびDEQUEUEプロシージャを実行できるようにします。
GRANT EXECUTE ON DBMS_AQ TO oe;
dbs1.example.comデータベースにOracle Streams管理者として接続します。
ANYDATAキューが存在しない場合は作成します。
BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'oe_q_table_any',
queue_name => 'oe_q_any',
queue_user => 'oe');
END;
/
oeユーザーは、oe_q_anyキューの保護キュー・ユーザーとして自動的に構成され、このキューのENQUEUE権限およびDEQUEUE権限を付与されます。また、oeという名前のOracle Streams AQエージェントが構成され、oeユーザーに関連付けられます。ただし、メッセージをデキューできるサブスクライバが構成されないかぎり、メッセージをエンキューすることはできません。
oe_q_anyキューのサブスクライバを追加します。このサブスクライバによって、メッセージの明示的デキューが実行されます。
DECLARE
subscriber SYS.AQ$_AGENT;
BEGIN
subscriber := SYS.AQ$_AGENT('OE', NULL, NULL);
SYS.DBMS_AQADM.ADD_SUBSCRIBER(
queue_name => 'strmadmin.oe_q_any',
subscriber => subscriber);
END;
/
dbs1.example.comデータベースにoeユーザーとして接続します。
ANYDATA型のオブジェクトを入力パラメータとして使用し、ペイロードが含まれているメッセージを既存のANYDATAキューにエンキューするプロシージャを作成します。
CREATE OR REPLACE PROCEDURE oe.enq_proc (payload ANYDATA)
IS
enqopt DBMS_AQ.ENQUEUE_OPTIONS_T;
mprop DBMS_AQ.MESSAGE_PROPERTIES_T;
enq_msgid RAW(16);
BEGIN
mprop.SENDER_ID := SYS.AQ$_AGENT('OE', NULL, NULL);
DBMS_AQ.ENQUEUE(
queue_name => 'strmadmin.oe_q_any',
enqueue_options => enqopt,
message_properties => mprop,
payload => payload,
msgid => enq_msgid);
END;
/
適切なConvertdata_typeファンクションを指定することによって、手順7で作成したプロシージャを実行します。次のコマンドでは様々な型のメッセージをエンキューします。
VARCHAR2型:
EXEC oe.enq_proc(ANYDATA.ConvertVarchar2('Chemicals - SW'));
COMMIT;
NUMBER型:
EXEC oe.enq_proc(ANYDATA.ConvertNumber('16'));
COMMIT;
ユーザー定義型:
BEGIN
oe.enq_proc(ANYDATA.ConvertObject(oe.cust_address_typ(
'1646 Brazil Blvd','361168','Chennai','Tam', 'IN')));
END;
/
COMMIT;
次に、ANYDATAペイロードでラップされるペイロードをデキューする手順を示します。この例では、「ANYDATAラッパーでのユーザー・メッセージ・ペイロードのラップおよびそれらのエンキュー」で説明されている手順が完了済であると想定されています。
メッセージをデキューするには、メッセージのコンシューマがわかっている必要があります。キュー内のメッセージのコンシューマを確認するには、キューの所有者として接続し、AQ$queue_table_nameを問い合せます。queue_table_nameはキュー表の名前です。たとえば、oe_q_anyキュー内のメッセージのコンシューマを確認するには、次の問合せを実行します。
SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_Q_TABLE_ANY;
SQL*Plusで、データベースにoeユーザーとして接続します。
SQL*Plusでデータベースに接続する方法については、Oracle Database管理者ガイドを参照してください。
デキューするメッセージのコンシューマが入力として使用されるプロシージャを作成します。次の例のプロシージャでは、oe.cust_address_typのメッセージがデキューされ、メッセージのコンテンツが出力されます。
CREATE OR REPLACE PROCEDURE oe.get_cust_address (
consumer IN VARCHAR2) AS
address OE.CUST_ADDRESS_TYP;
deq_address ANYDATA;
msgid RAW(16);
deqopt DBMS_AQ.DEQUEUE_OPTIONS_T;
mprop DBMS_AQ.MESSAGE_PROPERTIES_T;
new_addresses BOOLEAN := TRUE;
next_trans EXCEPTION;
no_messages EXCEPTION;
pragma exception_init (next_trans, -25235);
pragma exception_init (no_messages, -25228);
num_var pls_integer;
BEGIN
deqopt.consumer_name := consumer;
deqopt.wait := 1;
WHILE (new_addresses) LOOP
BEGIN
DBMS_AQ.DEQUEUE(
queue_name => 'strmadmin.oe_q_any',
dequeue_options => deqopt,
message_properties => mprop,
payload => deq_address,
msgid => msgid);
deqopt.navigation := DBMS_AQ.NEXT;
DBMS_OUTPUT.PUT_LINE('****');
IF (deq_address.GetTypeName() = 'OE.CUST_ADDRESS_TYP') THEN
DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' ||
deq_address.GetTypeName());
num_var := deq_address.GetObject(address);
DBMS_OUTPUT.PUT_LINE(' **** CUSTOMER ADDRESS **** ');
DBMS_OUTPUT.PUT_LINE(address.street_address);
DBMS_OUTPUT.PUT_LINE(address.postal_code);
DBMS_OUTPUT.PUT_LINE(address.city);
DBMS_OUTPUT.PUT_LINE(address.state_province);
DBMS_OUTPUT.PUT_LINE(address.country_id);
ELSE
DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' ||
deq_address.GetTypeName());
END IF;
COMMIT;
EXCEPTION
WHEN next_trans THEN
deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION;
WHEN no_messages THEN
new_addresses := FALSE;
DBMS_OUTPUT.PUT_LINE('No more messages');
END;
END LOOP;
END;
/
手順1で作成したプロシージャを実行し、デキューするメッセージのコンシューマを指定します。次に例を示します。
SET SERVEROUTPUT ON SIZE 100000
EXEC oe.get_cust_address('OE');
この項では、次の要素をデータベースで構成する方法について説明します。
データベースのANYDATAキューにメッセージをエンキューするエンキュー・プロシージャ。この例では、エンキュー・プロシージャでトリガーを使用して、oe.orders表に行が挿入されるたびにメッセージをエンキューします。
ルールに基づいて永続LCRおよび永続ユーザー・メッセージをデキューできるメッセージ・クライアント。この例では、メッセージ・クライアントでルールを使用し、oe.orders表に関連するメッセージのみをデキューします。メッセージ・クライアントでDBMS_STREAMS_MESSAGINGのDEQUEUEプロシージャを使用して、メッセージを1つずつデキューし、注文の注文番号を表示します。
メッセージ・クライアントに対するメッセージ通知。この例では、メッセージ・クライアントで使用されるキューにメッセージがエンキューされると、電子メール・アドレスに通知が送信されます。このメッセージは、メッセージ・クライアントのルール・セットを満たすため、メッセージ・クライアントによってデキューできます。また、対象メッセージがエンキューされた場合にアプリケーションにアラートを通知するメッセージ通知を構成することもできます。これらのアプリケーションでは、カスタマイズされた方法でメッセージをデキューおよび処理できます。通知は、指定したHTTPポストに送信することもできます。
DBA_STREAMS_MESSAGE_CONSUMERSデータ・ディクショナリ・ビューを問い合せると、既存のメッセージ・クライアントおよび通知の詳細を参照できます。
メッセージ・クライアントおよびメッセージ通知を構成するには、次の手順を実行します。
SQL*Plusで、権限を付与でき、提供されているパッケージ内のサブプログラムを実行できる管理ユーザーとして接続します。
SQL*Plusでデータベースに接続する方法については、Oracle Database管理者ガイドを参照してください。
電子メールを送信する場合に使用されるホスト名、メール・ポート、およびDBMS_AQELMパッケージを使用して電子メール通知の電子メール・メッセージを送信する電子メール・アカウントを設定します。次の例では、メール・ホスト名をsmtp.example.com、メール・ポートを25、電子メール・アカウントをMary.Smith@example.comに設定します。
BEGIN
DBMS_AQELM.SET_MAILHOST('smtp.example.com') ;
DBMS_AQELM.SET_MAILPORT(25) ;
DBMS_AQELM.SET_SENDFROM('Mary.Smith@example.com');
END;
/
正しいメール・ホスト、メール・ポートおよび送信元電子メール・アドレスがわからない場合は、データベースを実行しているコンピュータ・システムのシステム管理者に問い合せてください。
メッセージ・クライアントの作成、メッセージのエンキューとデキュー、およびメッセージ通知の指定を行うユーザーに対して必要な権限を付与します。この例では、oeユーザーがこれらすべてのタスクを実行します。
GRANT EXECUTE ON DBMS_AQ TO oe;
GRANT EXECUTE ON DBMS_STREAMS_ADM TO oe;
GRANT EXECUTE ON DBMS_STREAMS_MESSAGING TO oe;
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
grantee => 'oe',
grant_option => FALSE);
END;
/
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
grantee => 'oe',
grant_option => FALSE);
END;
/
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ,
grantee => 'oe',
grant_option => FALSE);
END;
/
データベースにoeユーザーとして接続します。
次の例のように入力して、SET_UP_QUEUEを使用してANYDATAキューを作成します。
BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'oe.notification_queue_table',
queue_name => 'oe.notification_queue');
END;
/
次の例のように入力して、ユーザー・メッセージの型を作成します。
CREATE TYPE oe.user_msg AS OBJECT( object_name VARCHAR2(30), object_owner VARCHAR2(30), message VARCHAR2(50)); /
次の例のように入力して、注文がoe.orders表に挿入されるたびにメッセージをキューにエンキューするトリガーを作成します。
CREATE OR REPLACE TRIGGER oe.order_insert AFTER INSERT
ON oe.orders FOR EACH ROW
DECLARE
msg oe.user_msg;
str VARCHAR2(2000);
BEGIN
str := 'New Order - ' || :NEW.ORDER_ID || ' Order ID';
msg := oe.user_msg(
object_name => 'ORDERS',
object_owner => 'OE',
message => str);
DBMS_STREAMS_MESSAGING.ENQUEUE (
queue_name => 'oe.notification_queue',
payload => ANYDATA.CONVERTOBJECT(msg));
END;
/
次の例のように入力して、キューからメッセージをデキューするメッセージ・クライアント、およびデキューするメッセージを判断する場合にそのメッセージ・クライアントで使用されるルールを作成します。
BEGIN
DBMS_STREAMS_ADM.ADD_MESSAGE_RULE (
message_type => 'oe.user_msg',
rule_condition => ' :msg.OBJECT_OWNER = ''OE'' AND ' ||
' :msg.OBJECT_NAME = ''ORDERS'' ',
streams_type => 'dequeue',
streams_name => 'oe',
queue_name => 'oe.notification_queue');
END;
/
次の例のように入力して、メッセージ・クライアントでデキュー可能なメッセージのエンキュー時に電子メールを送信するメッセージ通知を設定します。
BEGIN
DBMS_STREAMS_ADM.SET_MESSAGE_NOTIFICATION (
streams_name => 'oe',
notification_action => 'Mary.Smith@example.com',
notification_type => 'MAIL',
include_notification => TRUE,
queue_name => 'oe.notification_queue');
END;
/
次の例のように入力して、メッセージ・クライアントを使用してメッセージをデキューするPL/SQLプロシージャを作成します。
CREATE OR REPLACE PROCEDURE oe.deq_notification(consumer IN VARCHAR2) AS msg ANYDATA; user_msg oe.user_msg; num_var PLS_INTEGER; more_messages BOOLEAN := TRUE; navigation VARCHAR2(30); BEGIN navigation := 'FIRST MESSAGE'; WHILE (more_messages) LOOP BEGIN DBMS_STREAMS_MESSAGING.DEQUEUE( queue_name => 'oe.notification_queue', streams_name => consumer, payload => msg, navigation => navigation, wait => DBMS_STREAMS_MESSAGING.NO_WAIT); IF msg.GETTYPENAME() = 'OE.USER_MSG' THEN num_var := msg.GETOBJECT(user_msg); DBMS_OUTPUT.PUT_LINE(user_msg.object_name); DBMS_OUTPUT.PUT_LINE(user_msg.object_owner); DBMS_OUTPUT.PUT_LINE(user_msg.message); END IF; navigation := 'NEXT MESSAGE'; COMMIT; EXCEPTION WHEN SYS.DBMS_STREAMS_MESSAGING.ENDOFCURTRANS THEN navigation := 'NEXT TRANSACTION'; WHEN DBMS_STREAMS_MESSAGING.NOMOREMSGS THEN more_messages := FALSE; DBMS_OUTPUT.PUT_LINE('No more messages.'); WHEN OTHERS THEN RAISE; END; END LOOP; END; /
次の例のように入力して、oe.orders表に行を挿入します。
INSERT INTO oe.orders VALUES(2521, 'direct', 144, 0, 922.57, 159, NULL); INSERT INTO oe.orders VALUES(2522, 'direct', 116, 0, 1608.29, 153, NULL); COMMIT; INSERT INTO oe.orders VALUES(2523, 'direct', 116, 0, 227.55, 155, NULL); COMMIT;
メッセージ通知によって、エンキューされた各メッセージに対して手順9で指定した電子メール・アドレスにメッセージが送信されます。各通知は、AQXmlNotificationです。この通知には、次のものが含まれています。
notification_options: 次のものが含まれています。
destination: メッセージのデキュー元の宛先キュー
consumer_name - メッセージをデキューしたメッセージ・クライアントの名前
message_set - メッセージ・プロパティのセット
次に、電子メール通知で送信されるAQXmlNotificationの書式の例を示します。
<?xml version="1.0" encoding="UTF-8"?>
<Envelope xmlns="http://ns.oracle.com/AQ/schemas/envelope">
<Body>
<AQXmlNotification xmlns="http://ns.oracle.com/AQ/schemas/access">
<notification_options>
<destination>OE.NOTIFICATION_QUEUE</destination>
<consumer_name>OE</consumer_name>
</notification_options>
<message_set>
<message>
<message_header>
<message_id>CB510DDB19454731E034080020AE3E0A</message_id>
<expiration>-1</expiration>
<delay>0</delay>
<priority>1</priority>
<delivery_count>0</delivery_count>
<sender_id>
<agent_name>OE</agent_name>
<protocol>0</protocol>
</sender_id>
<message_state>0</message_state>
</message_header>
</message>
</message_set>
</AQXmlNotification>
</Body>
</Envelope>
oe.deq_notificationプロシージャを次のように実行すると、この例でエンキューされたメッセージをデキューできます。
SET SERVEROUTPUT ON SIZE 100000
EXEC oe.deq_notification('OE');
|
注意: DBMS_AQパッケージを使用して、通知を構成することもできます。DBMS_AQパッケージでは、DBMS_STREAMS_ADMパッケージでは使用できない通知機能(バッファ済メッセージの通知、時間による通知のグループ化など)が提供されています。 |
|
関連項目:
|