Oracle Tuxedo Message Queue (OTMQ)では、次に示す機能がOracle Tuxedoアプリケーション・プログラマに提供されます。
OTMQでは、次に示す基本的なキューイング機能が提供されます。
tpqattach(3c)
を使用してキューにアタッチする必要があります。tpenqplus(3c)
を呼び出して、メッセージングが同期か非同期か、リカバリ可能か否か、および配信失敗時に行うアクションを決定します。 #define MSG_CLAS_EXAMPLES 2000
#define MSG_TYPE_CLIENT_REQ 1
TPQCTL ctl;
Q_ATTACH_CTL qattachctl;
char q_space[16] = "QSPACE";
char q_name[128] = "myqueue1";
long flags;
/* join the application */
if (tpinit(NULL) == -1)
{
(void) fprintf(stderr, "failed to join application: %s\n",
tpstrerror(tperrno));
exit(1);
}
memset(&qattachctl, 0x0, sizeof(qattachctl));
qattachctl.attachmode = TMQ_ATTACH_BY_NAME;
qattachctl.qtype = TMQ_ATTACH_PQ;
qattachctl.namespace_list = NULL;
qattachctl.namespace_list_len = 0;
qattachctl.timeout = 30;
memset(&ctl, 0x0, sizeof(ctl));
ctl.flags |= OTMQ;
flags = TPNOTRAN;
if (tpqattach(q_space, q_name, &ctl, &qattachctl, flags) == -1)
{
(void) fprintf(stderr, "failed to attach q[%s.%s]: %s\n", q_space,
q_name, tpstrerror(tperrno));
(void) tpterm();
exit(1);
}
/* get request buffer */
if ((reqstr = tpalloc("STRING", NULL, len)) == NULL)
{
(void) fprintf(stderr, "unable to allocate STRING buffer: %s",
tpstrerror(tperrno));
exit(1);
}
ctl.msg_class = MSG_CLAS_EXAMPLES; /* user defined message class */
ctl.msg_type = MSG_TYPE_CLIENT_REQ; /* user defined message type */
ctl.block = OTMQ_DEL_WF; /* use synchronous way */
ctl.DIP = OTMQ_DIP_MEM; /* interest point */
ctl.uma = OTMQ_UMA_RTS; /* undelivered message action - return to sender */
ctl.timeout = 30;
/* enqueue the message into the destination queue */
if (tpenqplus(target_qspace, target_qname, &ctl, reqstr, 0, 0) == -1)
{
(void) fprintf(stderr, "Failure to enqueue %s\n", tpstrerror(tperrno)); if (tperrno == TPEDIAGNOSTIC)
{
(void) fprintf(stderr, "Diagnostic code=[%d]\t", ctl.diagnostic);
}
tpfree((char *) reqstr);
(void) tpterm();
exit(1);
}
/* detach from queue */
/* tpqdetach() */
…
char qspacename[16] = "QSPACE";
char qname[128] = "myqueue2";
/* call tpinit to join the application */
/* tpinit() */
…
/* attach to the queue to dequeue message from, then do the dequeue action */
/* tpqattach() */
…
memset(&ctl, 0x0, sizeof(ctl));
ctl.flags |= OTMQ;
flags = TPNOTRAN;
/* get request buffer, allocate a buffer to store the dequeued message */
len = 100;
if ((reqstr = tpalloc("STRING", NULL, len)) == NULL)
{
(void) fprintf(stderr, "unable to allocate STRING buffer: %s",
tpstrerror(tperrno));
(void) tpterm();
exit(1);
}
/* dequeue the message from the queue */
ctl.timeout = 30;
if (tpdeqplus(qspacename, qname, &ctl, &reqstr, &len, 0) == -1)
{
if (tperrno == TPEDIAGNOSTIC)
{
(void) fprintf(stderr, "Diagnostic code=[%d]\t", ctl.diagnostic);
} else
{
(void) fprintf(stderr, "Failure to dequeue %s\n", tpstrerror(tperrno));
}
tpfree((char *) reqstr);
(void) tpterm();
exit(1);
}
/* detach from queue */
/* tpqdetach() */
…
tpqgetmsga(3c)
を呼び出します。/* first define the user action to be done when message arrive */
int gotMessage = 0;
int msgAction(long * flag)
{
printf("Get asynchronous message [%s],flag=0x%X\n",reqstr,flag);
gotMessage = 1;
}
int main(int argc, char **argv)
{
char qspacename[16] = "QSPACE";
char qname[128] = "myqueue1";
...
/* join the application */
/* tpinit() */
…
/* attach to the queue to dequeue message from */
/* tpqattach() */
…
memset(&qctl,0,sizeof(qctl));
qctl.flags |= OTMQ;
qctl.filter_idx = -1; /* no message filter designated, get the first available message in queue */
size_user_data=100;
if( tpqgetmsga(qspacename,
qname,
(TPQCTL *)&qctl,
(char **)&reqstr,
(long *)&size_user_data,
(long *)&msgAction,
(long *)0,
(long *)0,
TPNOTIME) != 0)
{
/* print out the error message string or diagnostic code */
…
tpfree((char *) reqstr);
(void) tpterm();
exit(1);
}
/* continue to do other actions, when message arrived in queue,
* user action "msgAction" will be called */
…
}
tpqconfirmmsg(3c)
を呼び出して、メッセージを受け取ったことを確認します。/* join the application */
/* tpinit() */
…
/* attach to the queue to dequeue message from, then do the dequeue action */
/* tpqattach() */
…
/* dequeue message */
/* tpdeqplus() */
…
/* check the message delivery status stored in TPQCTL */
if( ctl.status_block.del_psb_status == OTMQ__CONFIRMREQ)
{
/* This is a message need to be confirmed explicitly,
* use the dequeued message sequence to confirm */
if(tpqconfirmmsg(ctl.seq_number, 0) < 0)
{
/* print out the error message string or diagnostic code */
…
tpfree((char *) reqstr);
(void) tpterm();
exit(1);
}
}
OTMQでは、メッセージ・フィルタが提供され、これに定義した選択基準に一致するメッセージを取得できます。アプリケーションでは、標準のデキューAPIのtpdeqplus(3c)
を呼び出すとき、またはサブスクリプションAPIのtpqsubscribe(3c)
を呼び出すときに、メッセージ・フィルタを指定できます。
OTMQでは、簡易フィルタおよび複合フィルタという、2つのタイプのメッセージ・フィルタがサポートされています。簡易フィルタのライフサイクルは、1回の操作(デキューまたはサブスクリプション)のみです。これに対して複合フィルタは、事前定義が可能で、その後のデキュー操作での再利用が可能です。
タイプの異なるメッセージ・フィルタは、フォーマットの種類も異なります。次の項では、簡易フィルタまたは複合フィルタに指定できる選択基準を説明します。
簡易フィルタは、次に示す選択基準のうちの1つから構成されます。
表1は、選択基準が、選択モードおよび値のペアとして、どのように定義されるかを示しています。
#define MSG_CLAS_EXAMPLES 2000
#define MSG_TYPE_CLIENT_REQ 1
TPQCTL ctl;
….
/* join the application */
/* tpinit() */
…
/* attach to the Qspace */
/* tpqattach() */
…
/* select by message attributes */
ctl.flags |= TPQGETBYMSGCLASS;
ctl.msg_class = MSG_CLAS_EXAMPLES;
ctl.flags |= TMQGETBYMSGTYPE;
ctl.msg_type = MSG_TYPE_CLIENT_REQ;
ctl.flags |= TPQGETBYPRIORITY;
ctl.priority = 50;
…
/* call tpdeqplus to dequeue a message with
* message class is "MSG_CLAS_EXAMPLES",
* message type is "MSG_TYPE_CLIENT_REQ" and
* message priority is 50 */
if (tpdeqplus(qspacename, qname, &ctl, &reqstr, &len, 0) == -1)
{
/* deal with failed scenario */
……
}
…
/* detach from Qspace */
/* tpqdetach() */
…
複合フィルタにより、メッセージを受け取るための複雑な選択基準を定義できます。選択基準の配列により、キューに対して、メッセージ受信の優先度、範囲チェックのための2つの比較キー、およびメッセージがキューから選択される順序を決定する順序キーによる検索を指定できます。
アプリケーションでは、まずtpqsetselect(3c)
関数を呼び出してフィルタを定義し、その戻り値としてインデックスのハンドルを受け取り、このハンドルを使用して標準のデキューAPIでメッセージを取得できます。
またアプリケーションでは、リスト6に示すように、tpqcancelselect(3c)
を呼び出し、前に定義した複合フィルタを取り消すことができます。
char qspacename[16] = "QSPACE";
char qname[128] = "myqueue1";
char src_qname[128] = "from_que";
TPQctl ctl;
selection_array_component_tp selection_array;
short num_masks = 1;
int index_handle = -1;
/* join the application */
/* tpinit() */
/* attach to the Qspace */
/* tpqattach() */
/* set complex filter to dequeue message with specific message class,
* and from specific queue*/
memset(&selection_array, 0x0, sizeof(selection_array));
selection_array.key_1_offset = OTMQ_CLASS;
selection_array.key_1_size = 4;
selection_array.key_1_value = MSG_CLAS_EXAMPLES;
selection_array.key_1_oper = OTMQ_OPER_EQ;
selection_array.key_2_offset = OTMQ_SOURCE;
selection_array.key_2_size = 4;
selection_array.key_value_qspace = qspacename;
selection_array.key_value_queue = src_qname;
selection_array.key_2_oper = OTMQ_OPER_EQ;
if(tpqsetselect(&selection_array, &num_masks, &index_handle ) == -1)
{
/* deal with failed scenario */
…
}
ctl.filter_idx = index_handle; /* using the filter to dequeue */
if(tpdeqplus(qspacename, qname, &ctl, &reqstr, &len, 0) == -1)
{
/* deal with failed scenario */
…
}
/* need to cancel the filter using the index */
if(tpqcancelselect(&index_handle)== -1)
{
/* deal with failed scenario */
…
}
/* detach from Qspace */
/* tpqdetach() */
…
詳細は、『Oracle Tuxedo Message Queueリファレンス・ガイド』のtpqsetselect(3c)
およびtpqcancelselect(3c)
に関する項を参照してください。
パブリッシャは、メッセージを特別な「トピック」に送信することによりメッセージをブロードキャストします。各トピックは、ブロードキャスト・ストリームを表します。ブロードキャスト・ストリームは、特定のトピックに送られたメッセージを受信するように登録されているターゲット・キューのセットです。サブスクライバは、特定のブロードキャスト・メッセージを受信するため、まずトピックに登録する必要があります。
OTMQのメッセージ・キュー・マネージャ・サーバーにより、特定のトピックに関心を持つユーザー・プロセスのリストが保持されます。さらに、このサーバーにより、アプリケーションでtpqsubscribe(3c)
を使用して設定したブロードキャスト・ストリームから選択的にメッセージを抽出するために使用される、様々なユーザー定義のルール(pub/subフィルタとも呼ばれます)が保持されます。
パブリッシャは、tpqpublish(3c)
を使用してブロードキャスト・メッセージを送信でき、サブスクライバは、アタッチしたキューからサブスクライブ済のメッセージを取得できます。
メッセージをブロードキャストするため、パブリッシャのプログラムでは、メッセージ配信にブロードキャスト・ストリームを使用することがわかっているトピックにメッセージを送ります。アプリケーションでtpqpublish(3c)
関数を呼び出すと、OTMQのメッセージ・キュー・マネージャ・サーバーによってtpqpublish(3c)
の呼び出しが処理され、そのメッセージが、対応するすべての受信者に透過的にリダイレクトされます。
OTMQのメッセージ・キュー・マネージャ・サーバーでは、それぞれのサブスクリプション・ルールのエントリでいくつの選択が一致したかには関係なく、各ターゲット・キューに対し、ブロードキャスト・ストリームの1つのメッセージに1つのコピーのみが配信されます。
ブロードキャスト・メッセージは、自動メッセージ・リカバリ用には格納できません。
ブロードキャスト・メッセージを受信するには、アプリケーションで、標準のAPIのtpqsubscribe(3c)
を使用し、ローカルまたはリモートのOTMQのメッセージ・キュー・マネージャ・サーバーに受信者として登録します。
ブロードキャスト・メッセージを受信するには、アプリケーションで、OTMQのメッセージ・キュー・マネージャ・サーバーによって管理されるブロードキャスト・ストリーム(トピック)としてキューを登録します。
受信者、つまりサブスクライブするアプリケーションでは、tpqsubscribe(3c)
関数を使用してブロードキャスト・メッセージを登録します。登録には、文字列フォーマットのトピックに加え、アプリケーションで受信するメッセージに関連するなんらかの選択基準(フィルタ)が必要です。
アプリケーションでは、tpqsubscribe(3c)
関数を使用し、次に示す情報を提供してブロードキャスト・メッセージをサブスクライブします。
ブロードキャスト・メッセージの選択的な受信を登録するには、tpqsubscribe(3c)
のメッセージ・フィルタを使用します。このサブスクリプション・リクエストは、単一の選択ルールに適合するブロードキャスト・ストリームのすべてのメッセージのコピーを受信するターゲット・キューを登録します。
フィルタは、OTMQのメッセージ・キュー・サーバーがメッセージをサブスクライバにブロードキャストする前に評価される、ブール型のフィルタ・ルールを含む文字列です。フィルタ・ルールは、それが適用される型付きバッファに固有のものです。文字列タイプのメッセージでは、フィルタ・ルールは正規表現です。FMLバッファのメッセージでは、フィルタ・ルールはFMLブール・コンパイラ渡すことのできる文字列です(Tuxedo ATMIのFML関数Fboolco
を参照)。
表2に、フィルタの正規表現ルールがどのように定義できるかを示します。
優先順位のレベルは3つあります。結合の強さの順に並べると、次のようになります。
前述のとおり、かっこは優先順位が高いことを明示的に示すために使用します。
tpqsubscribe(3c)
は、サブスクライバにサブスクリプション・ハンドルを返します。このハンドルは、そのサブスクリプションのサブスクライブ解除に使用します。
メッセージがブロードキャスト・ストリームに送信されるとき、OTMQのメッセージ・キュー・マネージャ・サーバーにより、その種類のメッセージを受信するように登録されているアプリケーションが決定されます。そして、それらのメッセージが、すべての一致するアプリケーションに自動的に配布されます。受信側アプリケーションでは、標準のデキュー関数を使用してターゲット・キューからブロードキャスト・メッセージを読み取ります。メッセージにより、受信側アプリケーションに送信者のソース・キュー・アドレスも通知されます。
アプリケーションでは、tpqunsubscribe(3c)
の呼び出しにより、ブロードキャスト・ストリームからサブスクライブ中のメッセージを取り出すことができます。これにより、リスト7に示すように、内部の登録表からサブスクリプション・エントリが削除されます。
TPEVCTL evctl;
long evt_handle = 0L ; /* Event Subscription handles */
…
/* join the application */
/* tpinit() */
…
/* attach to the Qspace */
/* tpqattach() */
…
evctl.flags = TPEVQUEUE ;
(void)strcpy (evctl.name1, "QSPACE") ;
(void)strcpy (evctl.name1, "myqueue1") ;
evctl.qctl.flags |= OTMQ;
/* Subscribe */
evt_handle = tpqsubscribe ("TMQ:QNOT:QSPACE:mytopic",
NULL,&evctl,TPSIGRSTRT) ;
if (evt_handle == -1L) {
/* deal with failed scenario */
…
}
…
/* dequeue subscribed message */
if(tpdeqplus(qspacename, qname, &ctl, &reqstr, &len, 0) == -1)
{
/* deal with failed scenario */
…
}
/* Unsubscribe to the subscribed topic */
if (tpqunsubscribe (evt_handle, TPSIGRSTRT) == -1)
{
/* deal with failed scenario */
…
}
詳細は、『Oracle Tuxedo Message Queueリファレンス・ガイド』のtpqsubscribe(3c)およびtpqunsubscribe(3c)に関する項を参照してください。
アプリケーションでは、OTMQ標準のエンキュー関数tpenqplus(3c)
を使用し、リカバリ可能またはリカバリ不可の2つの配信モードのうちのいずれかを使用してメッセージを送信します。リカバリ不可として送信されたメッセージは、ターゲット・キューに配信できなかった場合、アプリケーションでエラー・リカバリ手続きを実装していないかぎり、失われます。リカバリ可能として送信されたメッセージは、システム、プロセスおよびネットワークの障害があっても、OTMQのメッセージ・キュー・マネージャ・サーバーおよびオフライン・トレード・ドライバにより、ターゲット・キューへの自動的な配信が保証されます。
保証付き配信を実現するため、リカバリ可能メッセージは、OTMQのメッセージ・キュー・マネージャ・サーバーによって送信側または受信側システムの不揮発性のストレージに書き込まれます。エラー状態のためにメッセージが配信できない場合は、OTMQのオフライン・トレード・ドライバにより、メッセージがリカバリ・ジャーナルから読み出され、配信が確認できるまでターゲット・キューへのメッセージ再配信が行われます。
アプリケーションの開発者は、アプリケーションでの必要性に応じて、どのメッセージをリカバリ可能として送信すべきかを決定します。リカバリ可能なメッセージングは、メッセージをディスクに格納する追加の手順を必要とするため、その分の処理時間と処理能力が余計に必要となります。パフォーマンスを最大化するには、リカバリ可能なメッセージングは、アプリケーションの処理にとってそれが不可欠な場合にのみ使用してください。
OTMQのリカバリ可能メッセージング機能には、次のような利点があります。
OTMQでは、配信不能キュー(DLQ)やメッセージが配信できなかった場合にメッセージを送信者に送り返す機能など、リカバリ不可メッセージのためのエラー・リカバリ機能も提供されます。このトピックでは、お客様のアプリケーションのために正しい選択ができるように、OTMQのすべての配信モードを説明します。
配信のリカバリ可能とリカバリ不可の選択は、アプリケーションでの必要性に基づきます。メッセージ送信に適切な方式を判断するため、アプリケーションの開発者は次のような点を決定します。
tpenqplus(3c)
関数で指定する配信モードは、次のような動作を決定します。
OTMQでは、リカバリ可能と指定されたメッセージの格納にメッセージ・リカバリ・ジャーナルが使用されます。ローカル・システムのメッセージ・リカバリ・ジャーナルは、ストア・アンド・フォワード(SAF)と呼ばれます。リモート・システムのメッセージ・リカバリ・ジャーナルは、宛先キュー・ファイル(DQF)と呼ばれます。リカバリ可能メッセージが配信できなかった場合、SAFまたはDQFキューのいずれかに格納され、ターゲット・グループとの通信が回復したときに、自動的にターゲットに再送されます。
OTMQでは、追加のメッセージ・リカバリ機能を提供するため、補助ジャーナルが使用されます。配信不能キュー(DLQ)は、配信不可メッセージを格納するためのメモリー・ベースのキューです。使用不能のレター・ジャーナル(DLJ)により、自動リカバリでは格納できなかったメッセージのためのディスク記憶域が提供されます。DLQまたはDLJに格納されている配信不可メッセージは、ユーザーまたはアプリケーションの制御により、OTMQのジャーナルAPIのtpqreadjrn(3c)
を使用して取得できます。
事後確認ジャーナル(PCJ)には、正常に確認済のリカバリ可能メッセージが格納されます。
OTMQのメッセージ・リカバリ・システムでメッセージを格納できない場合、配信不可メッセージ・アクション(UMA)が実行されます。一部のUMAでは、後で、ユーザーまたはアプリケーションの制御によってメッセージのリカバリが可能となります。
配信モードは、ブロック・タイプ(block)および配信インタレスト・ポイント(DIP)の2つのコンポーネントから構成されます。blockおよびDIPは、TPQCTL構造体に指定します。
リカバリ不可の配信インタレスト・ポイントにより、送信側プログラムでは、メッセージがターゲット・キューに格納されたとき(MEM)、メッセージがターゲット・キューから読み取られたとき(DEQ)、またはメッセージがターゲット・キューから読み取られ、tpqconfirmmsg(3c)関数を使用して受信側プログラムにより明示的に確認されたとき(ACK)に、通知を受信できます。
リカバリ可能配信インタレスト・ポイントが選択されている場合、メッセージは自動リカバリによってディスクに格納されます。リカバリ可能配信インタレスト・ポイントにより、送信側プログラムでは、メッセージをローカルのリカバリ・ジャーナルに格納するか(SAF)、メッセージをリモートのリカバリ・ジャーナルに格納するか(DQF)、またはメッセージをリモートのリカバリ・ジャーナルに格納して、メッセージがターゲットのアプリケーションで確認されたときに通知を受信するか(CONF)のいずれかが可能となります。
OTMQでは、ブロック・タイプと配信インタレスト・ポイントのすべての組合せはサポートされていません。表3に、有効な配信モードとその意味を示します。
リカバリ不可メッセージの配信は、最も高速で、最も効率的なメッセージ送信の方法です。次のような場合には、リカバリ不可配信モードを使用します。
リカバリ可能メッセージの配信は、最も安全なメッセージ送信の方法ですが、各メッセージを送信前に格納する必要があるため、かなりの処理上のオーバーヘッドを伴います。次のような場合には、リカバリ可能配信モードを使用します。
tpenqplus(3c)
関数を使用し、配信の引数との論理積により、UMA引数でメッセージが配信インタレスト・ポイントに配信できなかった場合に何をすべきかを指定できます。
リカバリ不可メッセージングでは、UMAは、メッセージがターゲット・キューに格納できなかった場合に実行されるアクションです。UMAが指定されていない場合、OTMQでは、メッセージを破棄するというデフォルトのアクションが実行されます。
リカバリ可能メッセージングでは、UMAは、メッセージがSAFまたはDQFキューのいずれにも格納できなかった場合に実行されるアクションです。リカバリ可能配信モードでは、OTMQのメッセージ・キュー・マネージャ・サーバーによるメッセージ配信が保証されないときの例外処理をお客様のアプリケーションで実行する必要があるため、必ずUMAを指定します。
リカバリ可能メッセージングでは、次のような場合にUMAが実行されます。
表4に、5つの有効なUMAを示します。
リカバリ可能メッセージを送信するには、TPQCTL構造体に適切なブロック・タイプ、DIPおよびUMAを指定してtpenqplus(3c)
関数を使用します。
リカバリ可能メッセージ送信のメッセージ・フローは次のようになります。
詳細は、『Oracle Tuxedo Message Queueリファレンス・ガイド』のtpenqplus(3c)
に関する項を参照してください。
リカバリ可能メッセージを受信するには、tpdeqplus(3c)
関数を使用します。リカバリ可能メッセージがターゲット・キューに配信されるとき、アプリケーションでは、次の処理を実行する必要があります。
リカバリ可能メッセージ受信のメッセージ・フローは次のようになります。
tpqconfirmmsg(3c)
関数を呼び出し、リカバリ可能メッセージの受信を確認する必要があります。そのキューが暗黙的な確認用に構成されている場合、確認メッセージは、tpdeqplus(3c)
によってリカバリ可能メッセージが正常にデキューされた後で自動的に送信されます。詳細は、『Oracle Tuxedo Message Queueリファレンス・ガイド』のtmqadmin(1)
に関する項を参照してください。tpqconfirmmsg(3c)
関数は、オフライン・トレード・ドライバに対してメッセージの正常配信を通知する確認通知を送信し、これにより、オフライン・トレード・ドライバで、メッセージ・リカバリ・ジャーナル・キューからそのメッセージを削除できます。DISC UMAを使用すると、配信モードの引数で指定された配信インタレスト・ポイントに配信できなかった場合、そのメッセージは破棄されます。DISC UMAは、送信側プログラムが各例外をその発生時に処理する場合に使用されます。OTMQでは、メッセージのコンテンツが送信側プログラムとの関連の中でも引き続き使用可能であるため、配信不可メッセージを破棄できます。
RTS UMAを使用すると、配信モードの引数で指定された配信インタレスト・ポイントに配信できなかった場合、そのメッセージは送信側プログラムのレスポンス・キューにリダイレクトされます。RTS UMAは、送信側プログラムが各例外をその発生時に処理したくない場合に使用されます。それにかわり、送信側プログラムでは、エラー処理のため、配信不可メッセージをそのメイン入力ストリームにリダイレクトします。
RTS UMAを使用する利点は、送信側プログラムが1つのキューにアタッチし、各メッセージを読み込むたびにそれに基づいて処理を行えることです。送信側プログラムでは、そのメッセージが新規のものか、または配信不可メッセージかを判断するために、各メッセージのPSBステータスの配信の値を読み取る必要があります。メッセージ・リカバリ・システムによって格納できなかったメッセージ、およびエラー処理を要求するメッセージは、ステータスとしてOTMQ__MSGUNDELが戻されます。
SAF UMAを使用すると、メッセージ・リカバリ・システムによってリモートのジャーナル・キューに格納できなかったメッセージは、ローカルのジャーナル・キューに格納されます。SAF UMAは、DQFおよびCONFのリカバリ可能配信インタレスト・ポイントとともに使用できまが、WF_SAF配信モードでは機能しません。
SAF UMAの使用は、送信側システムと受信側システム間のフロー制御の管理に役立ちます。リソースの不足やグループ間リンクの障害により、メッセージがリモートのジャーナル・キューに格納できなかった場合、そのメッセージはローカルのジャーナル・キューに書き込まれます。
注意: ここでのSAFは、ほぼ「SAF」のDIPと同じ意味です。メッセージが配信できないと、メッセージはSAFキューに格納されます。
DLQ UMAを使用すると、配信モードの引数で指定された配信インタレスト・ポイントに配信できなかった場合、そのメッセージは配信不能キューにリダイレクトされます。DLQ UMAは、送信側プログラムで、各メッセージを別々に処理することを可能としながら、指定したキューの配信不可メッセージのエラー処理を一元化する場合に使用されます。
配信不能キューは、各OTMQのメッセージ・キューイング・グループの標準的なグループ構成の一部です。これにより、自動リカバリでは格納できなかった、そのグループのすべての配信不可メッセージのためのメモリー・ベースの記憶域が提供されます。配信不能キューは、キュー番号96として定義され、tmqadmin(1)
のqspacecreate
コマンドにより自動的に作成されます。
配信不能キューを使用するには、送信側プログラムで、適切な配信引数、およびUMA引数としてOTMQ_UMA_DLQを使用してtpenqplus(3c)
関数を呼び出します。受信側プログラムに配信できなかったすべてのメッセージは、送信者のグループの配信不能キューに書き込まれます。アプリケーション・プログラムでは、tpqreadjrn(3c)
関数を使用して配信不可メッセージを取得でき、tpenqplus(3c)
関数を使用して再配信を試行できます。
配信不能キューを使用する利点は、配信不可メッセージを1つずつリカバリできることです。送信側プログラムまたはそのアプリケーション内の別のプロセスは、DLQにアタッチし、各配信不可メッセージのエラー・リカバリ処理を実行できます。配信不能キューの短所は、配信不可メッセージのためにディスク記憶域が不足することです。送信側ノードのシステム障害は、配信不能キュー内のすべての配信不可メッセージの消失を引き起こします。
DLJ UMAを使用すると、配信モードの引数で指定された配信インタレスト・ポイントに配信できなかった場合、そのメッセージはキュー番号196の補助ジャーナル(使用不能のレター・ジャーナル)に書き込まれます。このUMAは、リカバリ可能メッセージのためにのみ使用できます。DLJ UMAは、送信側プログラムでエラー処理手順を一元化する必要があり、かつアプリケーションで一定間隔の遅延を伴いつつDLJキューから多くのメッセージの再送信をサポートできる場合に使用されます。配信不可メッセージのDLJキューへの格納は、システムが停止してもメッセージが失われないことを保証し、ユーザーやアプリケーションの制御による再配信の試行を可能とします。
使用不能のレター・ジャーナル(DLJ)により、自動リカバリでは格納できなかったメッセージのためのディスク記憶域が提供されます。これは、tmqadmin(1)
のqspacecreate
コマンドにより自動的に作成されます。
使用不能のレター・ジャーナルを使用するには、送信側プログラムで、適切な配信引数、およびUMA引数としてOTMQ_UMA_DLJを指定してtpenqplus(3c)
関数を使用します。メッセージ・リカバリ・システムによって格納できなかったすべてのメッセージは、送信者のグループの使用不能のレター・ジャーナルに書き込まれます。リスト8に示すように、アプリケーション・プログラムでは、tpqreadjrn(3c)
関数を使用して配信不可メッセージを取得でき、tpenqplus(3c)
関数を使用して再配信を試行できます。
TPQCTL ctl;
int type;
…
/* join the application */
/* tpinit() */
/* attach to the QSpace */
/* tpqattach() */
/* get request buffer */
if ((reqstr = tpalloc("STRING", NULL, len)) == NULL)
{
(void) fprintf(stderr, "unable to allocate STRING buffer: %s",
tpstrerror(tperrno));
exit(1);
}
ctl.block = OTMQ_DEL_WF; /* use synchronous way */
ctl.DIP = OTMQ_DIP_SAF; /* interest point */
ctl.uma = OTMQ_UMA_DLJ; /* undelivered message action - Dead Letter Journal*/
/* enqueue the message into the destination queue */
if (tpenqplus(target_qspace, target_qname, &ctl, reqstr, 0, 0) == -1)
{
/* deal with failed scenario */
…
}
…
…
/* done other works, handle failed message in DLJ before exit */
ctl.flags |=OTMQ;
ctl.flags |= TPQREADJRN;
type = DLJ_HANDLE;
if (tpqreadjrn(myqspace, myqueue, &ctl, &rcv_buf, &len, 0) == -1)
{
/* deal with failed scenario */
…
}
/* enqueue the failed message again */
if (tpenqplus(target_qspace, target_qname, &ctl, rcv_buf, 0, 0) == -1)
{
/* deal with failed scenario */
…
}
…
/* detach from the Qspace */
/* tpqdetach() */
…
表5に、配信モードとUMAの有効な組合せを示します。
OTMQの構成により、各メッセージ・キューには作成時に名前が付き、また、実行時にエイリアスを付けることもできます。ネーミングは、メッセージ・キューがローカル・システムにあるかまたは別のシステムにあるかにかかわらず、OTMQアプリケーションで名前/エイリアスによりメッセージ・キューを識別できるようにする強力な機能です。
アプリケーションの開発者は、OTMQのネーミング機能を使用し、アプリケーションをその基礎となるOTMQの環境構成から切り離すことができます。アプリケーション内でメッセージ・キューを名前/エイリアスで参照することで、開発者は、OTMQの環境構成が変わっても、そのアプリケーション・コードを変更する必要がなくなります。
ネーミング・サービスを活用するには、UBB内でTMQ_NA(5)
サーバーを構成する必要があります。
ネーミング・サービスは、OTMQのネーミング・サーバーによって提供されます。これによって、アプリケーションでエイリアスによりキューを参照する実行時のキューのルックアップ、つまり、キューのエイリアスと特定のキュー名との動的バインディングのために、ローカル・ネームスペースおよびグローバル・ネームスペースの両方にアクセスでき、管理できます。OTMQのネーミング・サーバーによって2つのサービスが提供され、1つはローカル・スコープのエイリアス・ルックアップ(ローカル・ネーミング・サービス)、もう1つはグローバルのルックアップ(グローバル・ネーミング・サービス)です。
メッセージ・キューに名前またはエイリアスが定義されるとき、名前のスコープも割り当てられます。キュー名またはエイリアスは、ローカル(qspace内)またはグローバル(qspace間)のスコープを持ちます。ローカルなエイリアスは、そのエイリアスが定義されているのと同じキュー・スペースで実行中のアプリケーションで使用できます。グローバルなエイリアスは、どのアプリケーションでも使用できます。
ネームスペースは、メッセージ・キューのエイリアスとそれに関連するメッセージ・キュー・アドレス(キュー・スペースおよびキュー名)が格納されているリポジトリです。OTMQのネーミング・サーバーでは、メッセージを名前付きキューに送信するため、エイリアスに関連付けられた実際のキュー・アドレスを探し、ネームスペースでエイリアスをルックアップします。
OTMQのネーミング・サーバーでは、プロセス、ローカル(qspace全体)およびグローバル(qspaceを超えた全体)という、3つのレベルのネームスペースが使用されます。OTMQのネーミング・サーバーが起動するとき、ローカル・スコープのエイリアスがローカルのネームスペースに格納されます。グローバル・スコープのエイリアスは、グローバルなネームスペースに格納されます。プロセスのネームスペースは、パフォーマンスの向上に使用されるアプリケーション・キャッシュです。エイリアスは様々なレベルのネームスペースにキャッシュされることがあり、パフォーマンスよりも精度を求めるなら、tpqlocate(3c)
を使用してキャッシュをバイパスできます。
アプリケーションでOTMQにアタッチすると、アプリケーションにより自動的に空のプロセス・ネームスペースが作成されます。プロセスが最初にエイリアスのロケーティング/バインドを行うとき、それがプロセスのネームスペースにキャッシュされます。
OTMQのネーミング・サーバーが起動するとき、ローカルのネームスペースが自動的に作成されます。また、アプリケーションによって定義されたローカル・スコープのキューのエイリアスがローカルのネームスペースにキャッシュされます。
グローバルなネームスペースを作成するには、OTMQのネーミング・サービスがその中でネームスペースをメンテナンスできるディレクトリを作成し、フラットなファイル・システムを使用します。
グローバルなネーミングを使用するには、ネーミング・サーバーが実行中のノード上にグローバル・ネームスペースを作成します。OTMQでは、ユーザーは最大2つのグローバル・ネーミング・サービス(プライマリおよびバックアップ)を構成できます。プライマリが停止したときにその責任を負うバックアップのネーミング・サービスを有効にするには、すべてのグローバル・ネーミング・サービスが同じグローバル・ネームスペースを使用するように構成する必要があります。このため、構成済のネーミング・サービスが異なるシステムで実行されている場合は、共有ファイル・システムを使用します。
ネームスペースを作成したら、ネームスペース用のデバイス名を指定するため、DMQNS_DEVICE
環境変数を設定します(これは、グローバル・ネーミングのためのネームスペースへのアクセスは、システムに依存するためです)。このため、グローバル・ネーミング・サービスを構成するときには、このネームスペースにアクセスするときにはどのデバイス名を使用するのかを指定します。これは、次に示すように環境変数DMQNS_DEVICE
を設定します。
注意: | この環境変数の設定が必要なのは、ネーミング・サービスを提供するOTMQのネーミング・サーバーのみです。グローバル・ネーミング・サービスを使用するには、少なくとも、グローバル・ネームスペース・ファイルのパスの一部が指定されている必要があります。たとえば、UNIXでは「/u/mydir 」のように定義できます。 |
アプリケーションでエイリアスでキューを参照するときは、tpqlocate(3c)
またはtpqbind(3c)
関数を使用し、次のいずれかの方法でエイリアスを指定します。
DMQNS_DEVICE
の値を付加します。さらに、その前に、「/」で始まる環境変数DMQNS_DEFAULTPATHの値が付加されます。たとえば、DMQNS_DEVICE
環境変数が「dev」に設定され、DMQNS_DEFAULTPATHが「/inventory
」に設定されている場合、ネーミング・サービスは、「widgets」という名前を「/dev/inventory/widgets
」で検索することになります。DMQNS_DEVICE
環境変数で指定されているデバイス、およびDMQNS_DEFAULTPATH環境変数の設定を付加します。たとえば、DMQNS_DEVICE
環境変数が「/dev」に設定され、DMQNS_DEFAULTPATHが「/inventory
」に設定されている場合、ネーミング・サービスは、「test/widgets
」という名前を「/dev/inventory/test/widgets
」で検索することになります。DMQNS_DEVICE
環境変数で指定されるデバイス名以外は、名前には何も情報を付加しません。これは、完全修飾名にはフルパス名およびキュー名が含まれることを意味します。たとえば、DMQNS_DEVICE
環境変数が「dev」に設定され、DMQNS_DEFAULTPATHが「/inventory」に設定されている場合、ネーミング・サービスは、「/production/test/widgets」という名前を「/dev/production/test/widgets」で検索することになります。リスト9は、グローバル・ネームスペース・ファイルの例を示しています。PrimaryQ_1 0.0 L
myqueue1 0.0 G
MRQ13_1 1.13 L
SQ14_2 0.0 G
アプリケーションでは、メッセージをキューから読み取ったり、キューにメッセージを送信する前に、tpqattach(3c)
関数を使用してキューにアタッチする必要があります。これにより、エイリアスや名前でキューが識別されます。メッセージを送信するときは、ターゲット・キューは常にその名前で識別されます。アプリケーションでは、直接キュー名を使用するか、またはtpqlocate(3c)
関数を使用してエイリアスからキュー名を導出できます。リスト10に、キューのロケーティングの例を示します。
static char q_space[16] = "QSPACE";
static char q_name[128] = "myqueue1";
…
Q_NAME_CTL name_ctl;
long scope = NAME_SCOPE_P;
/* join the application */
/* tpinit() */
/* attach to the QSpace */
/* tpqattach() */
/* locate queue named "Primary_queue" */
name_ctl.pName = "Primary_queue";
wait_mode = OTMQ_WF_RESP; /* use synchronous way to get response */
if(tpqlocate(q_space, &name_ctl, 0, NULL, &scope, wait_mode, 0) == -1)
{
/* deal with failed scenario */
…
}
OTMQでは、キューのエイリアスとキューのアドレスを関連付けるために、静的および動的の2つのアプローチが提供されます。
静的バインディングでは、ネームスペース・ファイルを使用し、キューのエイリアスでそれに関連付けられているキュー名を参照します。このような関連付けを有効にするには、キュー・スペースの作成時にネームスペース・ファイルを指定するか、またはネーミング・サーバーを停止し、ネームスペース・ファイルを指定するようにキュー・スペースを更新してからネーミング・サーバーを再起動します。詳細は、『Oracle Tuxedo Message Queueリファレンス・ガイド』のtmqadmin(1)
に関する項を参照してください。
動的バインディングでは、アプリケーションの実行時にtpqbind(3c)
を使用してキューのエイリアスをキュー名に関連付け、参照を行います。キューのエイリアスは、tpqbind(3c)
が正常に完了するまでは、特定のキュー名にはバインドされていません。この関連付けを変更するには、アプリケーションで、まずtpqbind(3c)
を使用してキューのエイリアスを特定のキュー名からアンバインドし、再び、同じAPIを使用して別のエイリアスをキュー名に関連付けます。アプリケーションでキューからデタッチしたとき、またはキュー・スペースを終了したとき、ネーミング・サーバーにより、このアプリケーションの関連付けが自動的にアンバインドされます。次のリストに、動的バインディングのキューの例を示します。
static char q_space[16] = "QSPACE";
static char q_name[128] = "myqueue1";
…
Q_NAME_CTL name_ctl;
long scope = NAME_SCOPE_G;
name_ctl.pName = "Primary_queue";
name_ctl.pGroup = q_space;
name_ctl.pQueue = q_name;
bind_time_out = 30;
if(tpqbind(q_space, &name_ctl, &scope, bind_time_out) == -1)
{
/* deal with failed scenario */
…
}
詳細は、『Oracle Tuxedo Message Queueリファレンス・ガイド』のtpqlocate(3c)およびtpqbind(3c)に関する項を参照してください。
WSモードでは、リカバリ可能配信モードを使用して送信されるOTMQメッセージは、サーバー・システムへの接続が使用できない場合、ローカルのストア・アンド・フォワード(SAF)ジャーナル(tmqsaf.jrn
)に書き込まれます。
この機能が有効になっていると、次のような信頼性の高い配信モードを使用して送信されたメッセージがジャーナルに格納されます。
OTMQ_DIP_MEM & OTMQ_DEL_WF (OTMQ_UMA_SAFを使用)
OTMQのWS構成オプションにより、SAFジャーナルは次のように構成できます。
ディスク・ブロックが使用できなくなるまで無制限に拡大する動的ファイル
これらのオプションにより、メッセージ・ジャーナルのため、どのようにディスク・リソースを使用するかを決定できます。無制限に拡大するジャーナル・ファイルでは、メッセージの格納に必要なディスク・ブロックのエクステントが定期的に割り当てられます。すべてのメッセージがSAFから送信されてジャーナルが空になったとき、ジャーナルで使用されていたディスク・ブロックが解放され、ジャーナル・ファイルは削除されます。ジャーナル・ファイルのサイズは、ディスク・ブロック・サイズ(4096バイト)単位です。
Tuxedoのbuildclient(1)およびbuildserver(1)コマンドに対応するものとして、OTMQではbuildqclient(1)
およびbuildqserver(1)
が提供されます。
buildqclient(1)
は、OTMQのクライアント・モジュールを構成するときに使用します。このコマンドにより、指定されたファイルが標準のOracle Tuxedo ATMIライブラリおよびOTMQライブラリと結合され、ロード・モジュールが作成されます。
buildqserver(1)
は、OTMQのサーバー・ロード・モジュールを構成するときに使用します。このコマンドにより、指定されたファイルが標準のサーバー・メイン・ルーチン、標準のOracle Tuxedo ATMIライブラリおよびOTMQライブラリと結合され、ロード・モジュールが作成されます。
詳細は、『Oracle Tuxedo Message Queueリファレンス・ガイド』のbuildqclient(1)およびbuildqserver(1)に関する項を参照してください。