Oracle Streamsによって、キュー、伝播、エンキュー機能およびデキュー機能を使用したメッセージングが有効になります。キューには、ANYDATAキューまたは型付きキューを使用できます。ANYDATA
キューでは、ペイロードの型がANYDATA
のユーザー・メッセージがステージングされます。ANYDATA
ペイロードは、様々なデータ型のペイロードのラッパーとして使用できます。型付きキューでは、特定のタイプのメッセージをステージングできます。
次の各項では、Oracle Streamsメッセージ環境の構成について説明します。
この章で説明する各タスクは、特に明記されていないかぎり、適切な権限を付与されているOracle Streams管理者が完了する必要があります。この章の例では、各データベースのOracle Streams管理者が構成済であると想定されています。
関連項目:
|
ANYDATA
ペイロードのほとんどすべての型のペイロードはラップできます。次の各項では、ANYDATA
キューに対するメッセージのエンキューおよびデキューの例を示します。
次に、ANYDATA
ペイロードの様々な型のペイロードをラップする手順を示します。
SQL*Plusで、ユーザーの作成、権限の付与、表領域の作成およびユーザーの変更を実行できる管理者ユーザーとして、dbs1.example.com
データベースに接続します。
SQL*Plusでデータベースに接続する方法については、Oracle Database管理者ガイドを参照してください。
DBMS_AQ
パッケージのEXECUTE
権限をoe
ユーザーに付与し、このユーザーがこのパッケージのENQUEUE
プロシージャおよびDEQUEUE
プロシージャを実行できるようにします。
GRANT EXECUTE ON DBMS_AQ TO oe;
dbs1.example.com
データベースにOracle Streams管理者として接続します。
ANYDATA
キューが存在しない場合は作成します。
BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'oe_q_table_any', queue_name => 'oe_q_any', queue_user => 'oe'); END; /
oe
ユーザーは、oe_q_any
キューの保護キュー・ユーザーとして自動的に構成され、このキューのENQUEUE
権限およびDEQUEUE
権限を付与されます。また、oe
という名前のOracle Streams AQエージェントが構成され、oe
ユーザーに関連付けられます。ただし、メッセージをデキューできるサブスクライバが構成されないかぎり、メッセージをエンキューすることはできません。
oe_q_any
キューのサブスクライバを追加します。このサブスクライバによって、メッセージの明示的デキューが実行されます。
DECLARE subscriber SYS.AQ$_AGENT; BEGIN subscriber := SYS.AQ$_AGENT('OE', NULL, NULL); SYS.DBMS_AQADM.ADD_SUBSCRIBER( queue_name => 'strmadmin.oe_q_any', subscriber => subscriber); END; /
dbs1.example.com
データベースにoe
ユーザーとして接続します。
ANYDATA
型のオブジェクトを入力パラメータとして使用し、ペイロードが含まれているメッセージを既存のANYDATA
キューにエンキューするプロシージャを作成します。
CREATE OR REPLACE PROCEDURE oe.enq_proc (payload ANYDATA) IS enqopt DBMS_AQ.ENQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; enq_msgid RAW(16); BEGIN mprop.SENDER_ID := SYS.AQ$_AGENT('OE', NULL, NULL); DBMS_AQ.ENQUEUE( queue_name => 'strmadmin.oe_q_any', enqueue_options => enqopt, message_properties => mprop, payload => payload, msgid => enq_msgid); END; /
適切なConvert
data_type
ファンクションを指定することによって、手順7で作成したプロシージャを実行します。次のコマンドでは様々な型のメッセージをエンキューします。
VARCHAR2
型:
EXEC oe.enq_proc(ANYDATA.ConvertVarchar2('Chemicals - SW')); COMMIT;
NUMBER
型:
EXEC oe.enq_proc(ANYDATA.ConvertNumber('16')); COMMIT;
ユーザー定義型:
BEGIN oe.enq_proc(ANYDATA.ConvertObject(oe.cust_address_typ( '1646 Brazil Blvd','361168','Chennai','Tam', 'IN'))); END; / COMMIT;
次に、ANYDATA
ペイロードでラップされるペイロードをデキューする手順を示します。この例では、「ANYDATAラッパーでのユーザー・メッセージ・ペイロードのラップおよびそれらのエンキュー」で説明されている手順が完了済であると想定されています。
メッセージをデキューするには、メッセージのコンシューマがわかっている必要があります。キュー内のメッセージのコンシューマを確認するには、キューの所有者として接続し、AQ$
queue_table_name
を問い合せます。queue_table_name
はキュー表の名前です。たとえば、oe_q_any
キュー内のメッセージのコンシューマを確認するには、次の問合せを実行します。
SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_Q_TABLE_ANY;
SQL*Plusで、データベースにoe
ユーザーとして接続します。
SQL*Plusでデータベースに接続する方法については、Oracle Database管理者ガイドを参照してください。
デキューするメッセージのコンシューマが入力として使用されるプロシージャを作成します。次の例のプロシージャでは、oe.cust_address_typ
のメッセージがデキューされ、メッセージのコンテンツが出力されます。
CREATE OR REPLACE PROCEDURE oe.get_cust_address ( consumer IN VARCHAR2) AS address OE.CUST_ADDRESS_TYP; deq_address ANYDATA; msgid RAW(16); deqopt DBMS_AQ.DEQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; new_addresses BOOLEAN := TRUE; next_trans EXCEPTION; no_messages EXCEPTION; pragma exception_init (next_trans, -25235); pragma exception_init (no_messages, -25228); num_var pls_integer; BEGIN deqopt.consumer_name := consumer; deqopt.wait := 1; WHILE (new_addresses) LOOP BEGIN DBMS_AQ.DEQUEUE( queue_name => 'strmadmin.oe_q_any', dequeue_options => deqopt, message_properties => mprop, payload => deq_address, msgid => msgid); deqopt.navigation := DBMS_AQ.NEXT; DBMS_OUTPUT.PUT_LINE('****'); IF (deq_address.GetTypeName() = 'OE.CUST_ADDRESS_TYP') THEN DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' || deq_address.GetTypeName()); num_var := deq_address.GetObject(address); DBMS_OUTPUT.PUT_LINE(' **** CUSTOMER ADDRESS **** '); DBMS_OUTPUT.PUT_LINE(address.street_address); DBMS_OUTPUT.PUT_LINE(address.postal_code); DBMS_OUTPUT.PUT_LINE(address.city); DBMS_OUTPUT.PUT_LINE(address.state_province); DBMS_OUTPUT.PUT_LINE(address.country_id); ELSE DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' || deq_address.GetTypeName()); END IF; COMMIT; EXCEPTION WHEN next_trans THEN deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION; WHEN no_messages THEN new_addresses := FALSE; DBMS_OUTPUT.PUT_LINE('No more messages'); END; END LOOP; END; /
手順1で作成したプロシージャを実行し、デキューするメッセージのコンシューマを指定します。次に例を示します。
SET SERVEROUTPUT ON SIZE 100000 EXEC oe.get_cust_address('OE');
この項では、次の要素をデータベースで構成する方法について説明します。
データベースのANYDATA
キューにメッセージをエンキューするエンキュー・プロシージャ。この例では、エンキュー・プロシージャでトリガーを使用して、oe.orders
表に行が挿入されるたびにメッセージをエンキューします。
ルールに基づいて永続LCRおよび永続ユーザー・メッセージをデキューできるメッセージ・クライアント。この例では、メッセージ・クライアントでルールを使用し、oe.orders
表に関連するメッセージのみをデキューします。メッセージ・クライアントでDBMS_STREAMS_MESSAGING
のDEQUEUE
プロシージャを使用して、メッセージを1つずつデキューし、注文の注文番号を表示します。
メッセージ・クライアントに対するメッセージ通知。この例では、メッセージ・クライアントで使用されるキューにメッセージがエンキューされると、電子メール・アドレスに通知が送信されます。このメッセージは、メッセージ・クライアントのルール・セットを満たすため、メッセージ・クライアントによってデキューできます。また、対象メッセージがエンキューされた場合にアプリケーションにアラートを通知するメッセージ通知を構成することもできます。これらのアプリケーションでは、カスタマイズされた方法でメッセージをデキューおよび処理できます。通知は、指定したHTTPポストに送信することもできます。
DBA_STREAMS_MESSAGE_CONSUMERS
データ・ディクショナリ・ビューを問い合せると、既存のメッセージ・クライアントおよび通知の詳細を参照できます。
メッセージ・クライアントおよびメッセージ通知を構成するには、次の手順を実行します。
SQL*Plusで、権限を付与でき、提供されているパッケージ内のサブプログラムを実行できる管理ユーザーとして接続します。
SQL*Plusでデータベースに接続する方法については、Oracle Database管理者ガイドを参照してください。
電子メールを送信する場合に使用されるホスト名、メール・ポート、およびDBMS_AQELM
パッケージを使用して電子メール通知の電子メール・メッセージを送信する電子メール・アカウントを設定します。次の例では、メール・ホスト名をsmtp.example.com
、メール・ポートを25
、電子メール・アカウントをMary.Smith@example.com
に設定します。
BEGIN DBMS_AQELM.SET_MAILHOST('smtp.example.com') ; DBMS_AQELM.SET_MAILPORT(25) ; DBMS_AQELM.SET_SENDFROM('Mary.Smith@example.com'); END; /
正しいメール・ホスト、メール・ポートおよび送信元電子メール・アドレスがわからない場合は、データベースを実行しているコンピュータ・システムのシステム管理者に問い合せてください。
メッセージ・クライアントの作成、メッセージのエンキューとデキュー、およびメッセージ通知の指定を行うユーザーに対して必要な権限を付与します。この例では、oe
ユーザーがこれらすべてのタスクを実行します。
GRANT EXECUTE ON DBMS_AQ TO oe; GRANT EXECUTE ON DBMS_STREAMS_ADM TO oe; GRANT EXECUTE ON DBMS_STREAMS_MESSAGING TO oe; BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ, grantee => 'oe', grant_option => FALSE); END; / BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ, grantee => 'oe', grant_option => FALSE); END; / BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ, grantee => 'oe', grant_option => FALSE); END; /
データベースにoe
ユーザーとして接続します。
次の例のように入力して、SET_UP_QUEUE
を使用してANYDATA
キューを作成します。
BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'oe.notification_queue_table', queue_name => 'oe.notification_queue'); END; /
次の例のように入力して、ユーザー・メッセージの型を作成します。
CREATE TYPE oe.user_msg AS OBJECT( object_name VARCHAR2(30), object_owner VARCHAR2(30), message VARCHAR2(50)); /
次の例のように入力して、注文がoe.orders
表に挿入されるたびにメッセージをキューにエンキューするトリガーを作成します。
CREATE OR REPLACE TRIGGER oe.order_insert AFTER INSERT ON oe.orders FOR EACH ROW DECLARE msg oe.user_msg; str VARCHAR2(2000); BEGIN str := 'New Order - ' || :NEW.ORDER_ID || ' Order ID'; msg := oe.user_msg( object_name => 'ORDERS', object_owner => 'OE', message => str); DBMS_STREAMS_MESSAGING.ENQUEUE ( queue_name => 'oe.notification_queue', payload => ANYDATA.CONVERTOBJECT(msg)); END; /
次の例のように入力して、キューからメッセージをデキューするメッセージ・クライアント、およびデキューするメッセージを判断する場合にそのメッセージ・クライアントで使用されるルールを作成します。
BEGIN DBMS_STREAMS_ADM.ADD_MESSAGE_RULE ( message_type => 'oe.user_msg', rule_condition => ' :msg.OBJECT_OWNER = ''OE'' AND ' || ' :msg.OBJECT_NAME = ''ORDERS'' ', streams_type => 'dequeue', streams_name => 'oe', queue_name => 'oe.notification_queue'); END; /
次の例のように入力して、メッセージ・クライアントでデキュー可能なメッセージのエンキュー時に電子メールを送信するメッセージ通知を設定します。
BEGIN DBMS_STREAMS_ADM.SET_MESSAGE_NOTIFICATION ( streams_name => 'oe', notification_action => 'Mary.Smith@example.com', notification_type => 'MAIL', include_notification => TRUE, queue_name => 'oe.notification_queue'); END; /
次の例のように入力して、メッセージ・クライアントを使用してメッセージをデキューするPL/SQLプロシージャを作成します。
CREATE OR REPLACE PROCEDURE oe.deq_notification(consumer IN VARCHAR2) AS msg ANYDATA; user_msg oe.user_msg; num_var PLS_INTEGER; more_messages BOOLEAN := TRUE; navigation VARCHAR2(30); BEGIN navigation := 'FIRST MESSAGE'; WHILE (more_messages) LOOP BEGIN DBMS_STREAMS_MESSAGING.DEQUEUE( queue_name => 'oe.notification_queue', streams_name => consumer, payload => msg, navigation => navigation, wait => DBMS_STREAMS_MESSAGING.NO_WAIT); IF msg.GETTYPENAME() = 'OE.USER_MSG' THEN num_var := msg.GETOBJECT(user_msg); DBMS_OUTPUT.PUT_LINE(user_msg.object_name); DBMS_OUTPUT.PUT_LINE(user_msg.object_owner); DBMS_OUTPUT.PUT_LINE(user_msg.message); END IF; navigation := 'NEXT MESSAGE'; COMMIT; EXCEPTION WHEN SYS.DBMS_STREAMS_MESSAGING.ENDOFCURTRANS THEN navigation := 'NEXT TRANSACTION'; WHEN DBMS_STREAMS_MESSAGING.NOMOREMSGS THEN more_messages := FALSE; DBMS_OUTPUT.PUT_LINE('No more messages.'); WHEN OTHERS THEN RAISE; END; END LOOP; END; /
次の例のように入力して、oe.orders
表に行を挿入します。
INSERT INTO oe.orders VALUES(2521, 'direct', 144, 0, 922.57, 159, NULL); INSERT INTO oe.orders VALUES(2522, 'direct', 116, 0, 1608.29, 153, NULL); COMMIT; INSERT INTO oe.orders VALUES(2523, 'direct', 116, 0, 227.55, 155, NULL); COMMIT;
メッセージ通知によって、エンキューされた各メッセージに対して手順9で指定した電子メール・アドレスにメッセージが送信されます。各通知は、AQXmlNotification
です。この通知には、次のものが含まれています。
notification_options
: 次のものが含まれています。
destination
: メッセージのデキュー元の宛先キュー
consumer_name
- メッセージをデキューしたメッセージ・クライアントの名前
message_set
- メッセージ・プロパティのセット
次に、電子メール通知で送信されるAQXmlNotification
の書式の例を示します。
<?xml version="1.0" encoding="UTF-8"?> <Envelope xmlns="http://ns.oracle.com/AQ/schemas/envelope"> <Body> <AQXmlNotification xmlns="http://ns.oracle.com/AQ/schemas/access"> <notification_options> <destination>OE.NOTIFICATION_QUEUE</destination> <consumer_name>OE</consumer_name> </notification_options> <message_set> <message> <message_header> <message_id>CB510DDB19454731E034080020AE3E0A</message_id> <expiration>-1</expiration> <delay>0</delay> <priority>1</priority> <delivery_count>0</delivery_count> <sender_id> <agent_name>OE</agent_name> <protocol>0</protocol> </sender_id> <message_state>0</message_state> </message_header> </message> </message_set> </AQXmlNotification> </Body> </Envelope>
oe.deq_notification
プロシージャを次のように実行すると、この例でエンキューされたメッセージをデキューできます。
SET SERVEROUTPUT ON SIZE 100000 EXEC oe.deq_notification('OE');
注意: DBMS_AQ パッケージを使用して、通知を構成することもできます。DBMS_AQ パッケージでは、DBMS_STREAMS_ADM パッケージでは使用できない通知機能(バッファ済メッセージの通知、時間による通知のグループ化など)が提供されています。 |
関連項目:
|