ヘッダーをスキップ
Oracle Streams概要および管理
11g リリース1(11.1)
E05775-02
  目次
目次
索引
索引

前へ
前へ
 
次へ
次へ
 

13 Oracle Streamsメッセージ環境の構成

Oracle Streamsによって、キュー伝播、エンキュー機能およびデキュー機能を使用したメッセージングが有効になります。キューには、ANYDATAキューまたは型付きキューを使用できます。ANYDATAキューでは、ペイロードの型がANYDATAユーザー・メッセージがステージングされます。ANYDATAペイロードは、様々なデータ型のペイロードのラッパーとして使用できます。型付きキューでは、特定のタイプのメッセージをステージングできます。

次の各項では、Oracle Streamsメッセージ環境の構成について説明します。

この章で説明する各タスクは、特に明記されていないかぎり、適切な権限を付与されているOracle Streams管理者が完了する必要があります。この章の例では、各データベースのOracle Streams管理者が構成済であると想定されています。


関連項目:

  • 第3章「Oracle Streamsのステージングと伝播」

  • 「Oracle Streams管理者の構成」

  • Oracle Streamsメッセージ環境の構成例は、Oracle Database 2日でデータ・レプリケーションおよび統合ガイドを参照

  • Oracle Streamsアドバンスト・キューイング(AQ)および型付きキューの使用の詳細は、Oracle Streamsアドバンスト・キューイング・ユーザーズ・ガイドを参照

  • ANYDATAタイプの詳細は、Oracle Database PL/SQLパッケージ・プロシージャおよびタイプ・リファレンスを参照


ANYDATAラッパーでのユーザー・メッセージ・ペイロードのラップおよびそれらのエンキュー

ANYDATAペイロードのほとんどすべての型のペイロードはラップできます。次の各項では、ANYDATAキューに対するメッセージのエンキューおよびデキューの例を示します。

次に、ANYDATAペイロードの様々な型のペイロードをラップする手順を示します。

  1. SQL*Plusで、ユーザーの作成、権限の付与、表領域の作成およびユーザーの変更を実行できる管理者ユーザーとして、dbs1.example.comデータベースに接続します。

    SQL*Plusでデータベースに接続する方法については、Oracle Database管理者ガイドを参照してください。

  2. DBMS_AQパッケージのEXECUTE権限をoeユーザーに付与し、このユーザーがこのパッケージのENQUEUEプロシージャおよびDEQUEUEプロシージャを実行できるようにします。

    GRANT EXECUTE ON DBMS_AQ TO oe;
    
  3. dbs1.example.comデータベースにOracle Streams管理者として接続します。

  4. ANYDATAキューが存在しない場合は作成します。

    BEGIN
      DBMS_STREAMS_ADM.SET_UP_QUEUE(
        queue_table  => 'oe_q_table_any',
        queue_name   => 'oe_q_any',
        queue_user   => 'oe');
    END;
    /
    

    oeユーザーは、oe_q_anyキュー保護キュー・ユーザーとして自動的に構成され、このキューのENQUEUE権限およびDEQUEUE権限を付与されます。また、oeという名前のOracle Streams AQエージェントが構成され、oeユーザーに関連付けられます。ただし、メッセージをデキューできるサブスクライバが構成されないかぎり、メッセージをエンキューすることはできません。

  5. oe_q_anyキューのサブスクライバを追加します。このサブスクライバによって、メッセージの明示的デキューが実行されます。

    DECLARE
      subscriber SYS.AQ$_AGENT;
    BEGIN
      subscriber :=  SYS.AQ$_AGENT('OE', NULL, NULL);  
      SYS.DBMS_AQADM.ADD_SUBSCRIBER(
        queue_name  =>  'strmadmin.oe_q_any',
        subscriber  =>  subscriber);
    END;
    /
    
  6. dbs1.example.comデータベースにoeユーザーとして接続します。

  7. ANYDATA型のオブジェクトを入力パラメータとして使用し、ペイロードが含まれているメッセージを既存のANYDATAキューにエンキューするプロシージャを作成します。

    CREATE OR REPLACE PROCEDURE oe.enq_proc (payload ANYDATA) 
    IS
      enqopt     DBMS_AQ.ENQUEUE_OPTIONS_T;
      mprop      DBMS_AQ.MESSAGE_PROPERTIES_T;
      enq_msgid  RAW(16);
    BEGIN
      mprop.SENDER_ID := SYS.AQ$_AGENT('OE', NULL, NULL); 
      DBMS_AQ.ENQUEUE(
        queue_name          =>  'strmadmin.oe_q_any',
        enqueue_options     =>  enqopt,
        message_properties  =>  mprop,
        payload             =>  payload,
        msgid               =>  enq_msgid);
    END;
    /
    
  8. 適切なConvertdata_typeファンクションを指定することによって、手順7で作成したプロシージャを実行します。次のコマンドでは様々な型のメッセージをエンキューします。

    VARCHAR2型:

    EXEC oe.enq_proc(ANYDATA.ConvertVarchar2('Chemicals - SW'));
    COMMIT;
    

    NUMBER型:

    EXEC oe.enq_proc(ANYDATA.ConvertNumber('16'));
    COMMIT;
    

    ユーザー定義型:

    BEGIN
      oe.enq_proc(ANYDATA.ConvertObject(oe.cust_address_typ(
        '1646 Brazil Blvd','361168','Chennai','Tam', 'IN')));
    END;
    /
    COMMIT;
    

関連項目:

これらのエンキューされたメッセージのコンテンツの表示の詳細は、「永続キュー内のメッセージの内容の表示」を参照

ANYDATAペイロードでラップされるペイロードのデキュー

次に、ANYDATAペイロードでラップされるペイロードをデキューする手順を示します。この例では、「ANYDATAラッパーでのユーザー・メッセージ・ペイロードのラップおよびそれらのエンキュー」で説明されている手順が完了済であると想定されています。

メッセージをデキューするには、メッセージのコンシューマがわかっている必要があります。キュー内のメッセージのコンシューマを確認するには、キューの所有者として接続し、AQ$queue_table_nameを問い合せます。queue_table_nameはキュー表の名前です。たとえば、oe_q_anyキュー内のメッセージのコンシューマを確認するには、次の問合せを実行します。

SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_Q_TABLE_ANY;
  1. SQL*Plusで、データベースにoeユーザーとして接続します。

    SQL*Plusでデータベースに接続する方法については、Oracle Database管理者ガイドを参照してください。

  2. デキューするメッセージのコンシューマが入力として使用されるプロシージャを作成します。次の例のプロシージャでは、oe.cust_address_typのメッセージがデキューされ、メッセージのコンテンツが出力されます。

    CREATE OR REPLACE PROCEDURE oe.get_cust_address (
    consumer IN VARCHAR2) AS
      address         OE.CUST_ADDRESS_TYP;
      deq_address     ANYDATA; 
      msgid           RAW(16); 
      deqopt          DBMS_AQ.DEQUEUE_OPTIONS_T; 
      mprop           DBMS_AQ.MESSAGE_PROPERTIES_T;
      new_addresses   BOOLEAN := TRUE;
      next_trans      EXCEPTION;
      no_messages     EXCEPTION; 
      pragma exception_init (next_trans, -25235);
      pragma exception_init (no_messages, -25228);
      num_var         pls_integer;
    BEGIN
         deqopt.consumer_name := consumer;
         deqopt.wait := 1;
         WHILE (new_addresses) LOOP
         BEGIN
          DBMS_AQ.DEQUEUE( 
             queue_name          =>  'strmadmin.oe_q_any',
             dequeue_options     =>  deqopt,
             message_properties  =>  mprop,
             payload             =>  deq_address,
             msgid               =>  msgid);
          deqopt.navigation := DBMS_AQ.NEXT;
          DBMS_OUTPUT.PUT_LINE('****');
          IF (deq_address.GetTypeName() = 'OE.CUST_ADDRESS_TYP') THEN
              DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' ||  
                                    deq_address.GetTypeName()); 
              num_var := deq_address.GetObject(address);    
              DBMS_OUTPUT.PUT_LINE(' **** CUSTOMER ADDRESS **** ');
              DBMS_OUTPUT.PUT_LINE(address.street_address);
              DBMS_OUTPUT.PUT_LINE(address.postal_code);
              DBMS_OUTPUT.PUT_LINE(address.city);
              DBMS_OUTPUT.PUT_LINE(address.state_province);
              DBMS_OUTPUT.PUT_LINE(address.country_id);
          ELSE
             DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' ||    
                                   deq_address.GetTypeName()); 
          END IF;  
        COMMIT;   
        EXCEPTION
          WHEN next_trans THEN
          deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION;
          WHEN no_messages THEN
            new_addresses := FALSE;
            DBMS_OUTPUT.PUT_LINE('No more messages');
         END;
      END LOOP; 
    END;
    /
    
  3. 手順1で作成したプロシージャを実行し、デキューするメッセージのコンシューマを指定します。次に例を示します。

    SET SERVEROUTPUT ON SIZE 100000
    EXEC oe.get_cust_address('OE');
    

メッセージ・クライアントおよびメッセージ通知の構成

この項では、次の要素をデータベースで構成する方法について説明します。

DBA_STREAMS_MESSAGE_CONSUMERSデータ・ディクショナリ・ビューを問い合せると、既存のメッセージ・クライアントおよび通知の詳細を参照できます。

メッセージ・クライアントおよびメッセージ通知を構成するには、次の手順を実行します。

  1. SQL*Plusで、権限を付与でき、提供されているパッケージ内のサブプログラムを実行できる管理ユーザーとして接続します。

    SQL*Plusでデータベースに接続する方法については、Oracle Database管理者ガイドを参照してください。

  2. 電子メールを送信する場合に使用されるホスト名、メール・ポート、およびDBMS_AQELMパッケージを使用して電子メール通知の電子メール・メッセージを送信する電子メール・アカウントを設定します。次の例では、メール・ホスト名をsmtp.example.com、メール・ポートを25、電子メール・アカウントをMary.Smith@example.comに設定します。

    BEGIN
      DBMS_AQELM.SET_MAILHOST('smtp.example.com') ;
      DBMS_AQELM.SET_MAILPORT(25) ;
      DBMS_AQELM.SET_SENDFROM('Mary.Smith@example.com');
    END;
    /
    

    正しいメール・ホスト、メール・ポートおよび送信元電子メール・アドレスがわからない場合は、データベースを実行しているコンピュータ・システムのシステム管理者に問い合せてください。

  3. メッセージ・クライアントの作成、メッセージのエンキューとデキュー、およびメッセージ通知の指定を行うユーザーに対して必要な権限を付与します。この例では、oeユーザーがこれらすべてのタスクを実行します。

    GRANT EXECUTE ON DBMS_AQ TO oe;
    GRANT EXECUTE ON DBMS_STREAMS_ADM TO oe;
    GRANT EXECUTE ON DBMS_STREAMS_MESSAGING TO oe;
    
    BEGIN 
      DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
        privilege    => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ, 
        grantee      => 'oe', 
        grant_option => FALSE);
    END;
    /
    
    BEGIN 
      DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
        privilege    => DBMS_RULE_ADM.CREATE_RULE_OBJ, 
        grantee      => 'oe', 
        grant_option => FALSE);
    END;
    /
    
    BEGIN 
      DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
        privilege    => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ, 
        grantee      => 'oe', 
        grant_option => FALSE);
    END;
    /
    
  4. データベースにoeユーザーとして接続します。

  5. 次の例のように入力して、SET_UP_QUEUEを使用してANYDATAキューを作成します。

    BEGIN
      DBMS_STREAMS_ADM.SET_UP_QUEUE(
        queue_table  => 'oe.notification_queue_table',
        queue_name   => 'oe.notification_queue');
    END;
    /
    
  6. 次の例のように入力して、ユーザー・メッセージの型を作成します。

    CREATE TYPE oe.user_msg AS OBJECT(
      object_name    VARCHAR2(30),
      object_owner   VARCHAR2(30),
      message        VARCHAR2(50));
    /
    
  7. 次の例のように入力して、注文がoe.orders表に挿入されるたびにメッセージをキューにエンキューするトリガーを作成します。

    CREATE OR REPLACE TRIGGER oe.order_insert AFTER INSERT
    ON oe.orders FOR EACH ROW
    DECLARE
      msg            oe.user_msg;
      str            VARCHAR2(2000);
    BEGIN
      str := 'New Order - ' || :NEW.ORDER_ID || ' Order ID';
      msg  := oe.user_msg(
                 object_name   => 'ORDERS',
                 object_owner  => 'OE',
                 message       => str);
      DBMS_STREAMS_MESSAGING.ENQUEUE (
        queue_name   => 'oe.notification_queue',
        payload      => ANYDATA.CONVERTOBJECT(msg));
    END;
    /
    
  8. 次の例のように入力して、キューからメッセージをデキューするメッセージ・クライアント、およびデキューするメッセージを判断する場合にそのメッセージ・クライアントで使用されるルールを作成します。

    BEGIN
      DBMS_STREAMS_ADM.ADD_MESSAGE_RULE (
        message_type   => 'oe.user_msg',
        rule_condition => ' :msg.OBJECT_OWNER = ''OE'' AND  ' ||
                          ' :msg.OBJECT_NAME = ''ORDERS'' ',
        streams_type   => 'dequeue',
        streams_name   => 'oe',
        queue_name     => 'oe.notification_queue');
    END;
    /
    
  9. 次の例のように入力して、メッセージ・クライアントでデキュー可能なメッセージのエンキュー時に電子メールを送信するメッセージ通知を設定します。

    BEGIN
      DBMS_STREAMS_ADM.SET_MESSAGE_NOTIFICATION (
        streams_name         => 'oe',
        notification_action  => 'Mary.Smith@example.com',
        notification_type    => 'MAIL',
        include_notification => TRUE,
        queue_name           => 'oe.notification_queue');
    END;
    /
    
  10. 次の例のように入力して、メッセージ・クライアントを使用してメッセージをデキューするPL/SQLプロシージャを作成します。

    CREATE OR REPLACE PROCEDURE oe.deq_notification(consumer IN VARCHAR2) AS
      msg            ANYDATA;
      user_msg       oe.user_msg;
      num_var        PLS_INTEGER;
      more_messages  BOOLEAN := TRUE;
      navigation     VARCHAR2(30);
    BEGIN
      navigation := 'FIRST MESSAGE';
      WHILE (more_messages) LOOP
        BEGIN
          DBMS_STREAMS_MESSAGING.DEQUEUE(
            queue_name   => 'oe.notification_queue',
            streams_name => consumer,
            payload      => msg,
            navigation   => navigation,
            wait         => DBMS_STREAMS_MESSAGING.NO_WAIT);
          IF msg.GETTYPENAME() = 'OE.USER_MSG' THEN
            num_var := msg.GETOBJECT(user_msg);
            DBMS_OUTPUT.PUT_LINE(user_msg.object_name);
            DBMS_OUTPUT.PUT_LINE(user_msg.object_owner);
            DBMS_OUTPUT.PUT_LINE(user_msg.message);
          END IF;
          navigation := 'NEXT MESSAGE';
          COMMIT;
        EXCEPTION WHEN SYS.DBMS_STREAMS_MESSAGING.ENDOFCURTRANS THEN
                    navigation := 'NEXT TRANSACTION';
                  WHEN DBMS_STREAMS_MESSAGING.NOMOREMSGS THEN
                    more_messages := FALSE;
                    DBMS_OUTPUT.PUT_LINE('No more messages.');
                  WHEN OTHERS THEN
                    RAISE;  
        END;
      END LOOP;
    END;
    /
    
  11. 次の例のように入力して、oe.orders表に行を挿入します。

    INSERT INTO oe.orders VALUES(2521, 'direct', 144, 0, 922.57, 159, NULL);
    INSERT INTO oe.orders VALUES(2522, 'direct', 116, 0, 1608.29, 153, NULL);
    COMMIT;
    INSERT INTO oe.orders VALUES(2523, 'direct', 116, 0, 227.55, 155, NULL);
    COMMIT;
    

メッセージ通知によって、エンキューされた各メッセージに対して手順9で指定した電子メール・アドレスにメッセージが送信されます。各通知は、AQXmlNotificationです。この通知には、次のものが含まれています。

次に、電子メール通知で送信されるAQXmlNotificationの書式の例を示します。

<?xml version="1.0" encoding="UTF-8"?>
<Envelope xmlns="http://ns.oracle.com/AQ/schemas/envelope">
    <Body>
        <AQXmlNotification xmlns="http://ns.oracle.com/AQ/schemas/access">
            <notification_options>
                <destination>OE.NOTIFICATION_QUEUE</destination>
                <consumer_name>OE</consumer_name>
            </notification_options>
            <message_set>
                <message>
                    <message_header>
                        <message_id>CB510DDB19454731E034080020AE3E0A</message_id>
                        <expiration>-1</expiration>
                        <delay>0</delay>
                        <priority>1</priority>
                        <delivery_count>0</delivery_count>
                        <sender_id>
                            <agent_name>OE</agent_name>
                            <protocol>0</protocol>
                        </sender_id>
                        <message_state>0</message_state>
                    </message_header>
                </message>
            </message_set>
        </AQXmlNotification>
    </Body>
</Envelope>

oe.deq_notificationプロシージャを次のように実行すると、この例でエンキューされたメッセージをデキューできます。

SET SERVEROUTPUT ON SIZE 100000
EXEC oe.deq_notification('OE');

注意:

DBMS_AQパッケージを使用して、通知を構成することもできます。DBMS_AQパッケージでは、DBMS_STREAMS_ADMパッケージでは使用できない通知機能(バッファ済メッセージの通知、時間による通知のグループ化など)が提供されています。


関連項目: