A サンプルXStreamクライアント・アプリケーション

XStreamによって使用されるOracle Databaseコンポーネントを構成する方法を例で示します。この例では、XStreamアウトバウンド・サーバーおよびインバウンド・サーバーと通信するサンプル・クライアント・アプリケーションを構成します。

A.1 サンプルXStreamクライアント・アプリケーションについて

サンプルXStreamクライアント・アプリケーションでは、XStream OutおよびXStream Inアプリケーションで必要な基本的なタスクを説明します。

このアプリケーションは、次のタスクを実行します。

  • XStreamアウトバウンド・サーバーおよびインバウンド・サーバーに接続して、アウトバウンド・サーバーからのLCRを待機します。アウトバウンド・サーバーおよびインバウンド・サーバーは、2つの異なるデータベース内にあります。

  • アウトバウンド・サーバーからLCRを受信すると、すぐにLCRをインバウンド・サーバーに送信します。

  • 定期的にインバウンド・サーバーから処理済最低位置を取得し、この値をアウトバウンド・サーバーに送信します。

  • 定期的にアウトバウンド・サーバーからインバウンド・サーバーに「ping」LCRを送信し、アクティビティが少ないときに、インバウンド・サーバーの処理済最低位置を前方に移動します。

LCRをインバウンド・サーバーに送信しないXStream Out構成では、クライアント・アプリケーションは別の方法で処理済最低位置を取得する必要があります。

このアプリケーションは、アウトバウンド・サーバーからのトランザクションを無期限に待機します。アプリケーションを中断するには、オペレーティング・システムの割込みコマンドを入力します。たとえば、一部のオペレーティング・システムの割込みコマンドは、control-Cです。プログラムを再起動すると、アウトバウンド・サーバーでは、前回の実行時に設定された処理済最低位置からLCRの送信を開始します。

図A-1はこの項で構成されたXStream環境の概要を示します。

図A-1 サンプルXStream構成

図A-1の説明
「図A-1 サンプルXStream構成」の説明

サンプル・アプリケーションを実行する前に、次が構成されていることを確認してください。

  • 2つのOracleデータベースとそれらの間のネットワーク接続

  • 両方のデータベースでのXStream管理者

  • 1つのデータベースでのアウトバウンド・サーバーの構成(取得プロセス、キューおよびアウトバウンド・サーバーを含む)

  • 別のデータベースでのインバウンド・サーバーの構成

マルチテナント・コンテナ・データベース(CDB)でサンプル・アプリケーションを実行している場合、クライアント・アプリケーションが正しいコンテナに接続していることを確認してください。

  • クライアント・アプリケーションがアウトバウンド・サーバーに接続する場合は、ルートに接続する必要があります。

  • クライアント・アプリケーションがインバウンド・サーバーに接続する場合は、インバウンド・サーバーが作成されたコンテナに接続する必要があります。

以降の項に示すサンプル・アプリケーションでは、同じタスクを実行します。1つのサンプル・アプリケーションはOCI APIを使用し、もう1つはJava APIを使用します。

注意:

Oracle Databaseインストール環境には、複数のXStreamデモが含まれています。これらのデモは次の場所にあります。

$ORACLE_HOME/rdbms/demo/xstream

A.2 Oracle Call Interface APIのサンプルXStreamクライアント・アプリケーション

OCI APIのサンプルXStreamクライアント・アプリケーションを実行するには、アプリケーション・ファイルをコンパイルおよびリンクします。

次に、コマンドラインに次のコマンドを入力します。

xio -ob_svr xout_name -ob_db sn_xout_db -ob_usr xout_cu -ob_pwd xout_cu_pass 
-ib_svr xin_name -ib_db sn_xin_db -ib_usr xin_au -ib_pwd xin_au_pass

次のプレースホルダに適切な値を代入します。

  • xout_nameはアウトバウンド・サーバーの名前です。

  • sn_xout_dbはアウトバウンド・サーバーのデータベースのサービス名です。

  • xout_cuはアウトバウンド・サーバーの接続ユーザーです。

  • xout_cu_passはアウトバウンド・サーバーの接続ユーザーのパスワードです。

  • xin_nameはインバウンド・サーバーの名前です。

  • sn_xin_dbはインバウンド・サーバーのデータベースのサービス名です。

  • xin_auはインバウンド・サーバーの適用ユーザーです。

  • xin_au_passはインバウンド・サーバーの適用ユーザーのパスワードです。

サンプル・クライアント・アプリケーションの実行中は、処理中の行LCRに関する情報が出力されます。出力は、次のようになります。

 ----------- ROW LCR Header  -----------------
  src_db_name=DB.EXAMPLE.COM
  cmd_type=UPDATE txid=17.0.74
  owner=HR oname=COUNTRIES 
 
 ----------- ROW LCR Header  -----------------
  src_db_name=DB.EXAMPLE.COM
  cmd_type=COMMIT txid=17.0.74
 
 ----------- ROW LCR Header  -----------------
  src_db_name=DB.EXAMPLE.COM
  cmd_type=UPDATE txid=12.25.77
  owner=OE oname=ORDERS 
 
 ----------- ROW LCR Header  -----------------
  src_db_name=DB.EXAMPLE.COM
  cmd_type=UPDATE txid=12.25.77
  owner=OE oname=ORDERS 

この出力には、各行LCRの次の情報が含まれています。

  • src_db_nameには行LCRにカプセル化された変更のソース・データベースが表示されます。

  • cmd_typeには変更を行ったSQL文のタイプが表示されます。

  • txidには行LCRを含むトランザクションのトランザクションIDが表示されます。

  • ownerには変更されたデータベース・オブジェクトの所有者が表示されます。

  • onameには変更されたデータベース・オブジェクトの名前が表示されます。

このデモは、次の場所にあります。

$ORACLE_HOME/rdbms/demo/xstream/oci

デモ・ファイルの名前は、xio.cです。アプリケーションのコンパイルと実行の詳細は、demoディレクトリにあるREADME.txtファイルを参照してください。

OCI APIを使用するサンプル・アプリケーションのコードは、次のとおりです。

#ifndef OCI_ORACLE
#include <oci.h>
#endif
 
#ifndef _STDIO_H
#include <stdio.h>
#endif
 
#ifndef _STDLIB_H
#include <stdlib.h>
#endif
 
#ifndef _STRING_H
#include <string.h>
#endif
 
#ifndef _MALLOC_H
#include <malloc.h>
#endif
 
/*---------------------------------------------------------------------- 
 *           Internal structures
 *----------------------------------------------------------------------*/ 
 
#define M_DBNAME_LEN    (128)
 
typedef struct conn_info                                     /* connect info */
{
  oratext * user;
  ub4       userlen;
  oratext * passw;
  ub4       passwlen;
  oratext * dbname;
  ub4       dbnamelen;
  oratext * svrnm;
  ub4       svrnmlen;
} conn_info_t;
 
typedef struct params
{
  conn_info_t  xout;                                        /* outbound info */
  conn_info_t  xin;                                          /* inbound info */
} params_t;
 
typedef struct oci                                            /* OCI handles */
{
  OCIEnv      *envp;                                   /* Environment handle */
  OCIError    *errp;                                         /* Error handle */
  OCIServer   *srvp;                                        /* Server handle */
  OCISvcCtx   *svcp;                                       /* Service handle */
  OCISession  *authp;
  OCIStmt    *stmtp;        
  boolean     attached;
  boolean     outbound;
} oci_t;
 
static void connect_db(conn_info_t *opt_params_p, oci_t ** ocip, ub2 char_csid,
                       ub2 nchar_csid);
static void disconnect_db(oci_t * ocip);
static void ocierror(oci_t * ocip, char * msg);
static void attach(oci_t * ocip, conn_info_t *conn, boolean outbound);
static void detach(oci_t *ocip);
static void get_lcrs(oci_t *xin_ocip, oci_t *xout_ocip);
static void get_chunks(oci_t *xin_ocip, oci_t *xout_ocip);
static void print_lcr(oci_t *ocip, void *lcrp, ub1 lcrtype, 
                      oratext **src_db_name, ub2  *src_db_namel);
static void print_chunk (ub1 *chunk_ptr, ub4 chunk_len, ub2 dty);
static void get_inputs(conn_info_t *xout_params, conn_info_t *xin_params, 
                       int argc, char ** argv);
static void get_db_charsets(conn_info_t *params_p, ub2 *char_csid, 
                            ub2 *nchar_csid);
static void set_client_charset(oci_t *outbound_ocip);
 
#define OCICALL(ocip, function) do {\
sword status=function;\
if (OCI_SUCCESS==status) break;\
else if (OCI_ERROR==status) \
{ocierror(ocip, (char *)"OCI_ERROR");\
exit(1);}\
else {printf("Error encountered %d\n", status);\
exit(1);}\
} while(0)
 
/*---------------------------------------------------------------------
 *                M A I N   P R O G R A M
 *---------------------------------------------------------------------*/
main(int argc, char **argv)
{
  /* Outbound and inbound connection info */
  conn_info_t   xout_params;
  conn_info_t   xin_params;
  oci_t        *xout_ocip = (oci_t *)NULL;
  oci_t        *xin_ocip = (oci_t *)NULL;
  ub2           obdb_char_csid = 0;                 /* outbound db char csid */
  ub2           obdb_nchar_csid = 0;               /* outbound db nchar csid */
 
  /* parse command line arguments */
  get_inputs(&xout_params, &xin_params, argc, argv); 
 
  /* Get the outbound database CHAR and NCHAR character set info */
  get_db_charsets(&xout_params, &obdb_char_csid, &obdb_nchar_csid);
 
  /* Connect to the outbound db and set the client env to the outbound charsets
   * to minimize character conversion when transferring LCRs from outbound 
   * directly to inbound server. 
   */
  connect_db(&xout_params, &xout_ocip, obdb_char_csid, obdb_nchar_csid);
 
  /* Attach to outbound server */
  attach(xout_ocip, &xout_params, TRUE);
 
  /* connect to inbound db and set the client charsets the same as the 
   * outbound db charsets.
   */
  connect_db(&xin_params, &xin_ocip, obdb_char_csid, obdb_nchar_csid);
 
  /* Attach to inbound server */
  attach(xin_ocip, &xin_params, FALSE);
 
  /* Get lcrs from outbound server and send to inbound server */
  get_lcrs(xin_ocip, xout_ocip);
 
  /* Detach from XStream servers */
  detach(xout_ocip);
  detach(xin_ocip);
 
  /* Disconnect from both databases */
  disconnect_db(xout_ocip);
  disconnect_db(xin_ocip);
 
  free(xout_ocip);
  free(xin_ocip);
  exit (0);
}
 
/*---------------------------------------------------------------------
 * connect_db - Connect to the database and set the env to the given
 * char and nchar character set ids. 
 *---------------------------------------------------------------------*/
static void connect_db(conn_info_t *params_p, oci_t **ociptr, ub2 char_csid,
                ub2 nchar_csid)
{
  oci_t        *ocip;
 
  printf ("Connect to Oracle as %.*s@%.*s ",
          params_p->userlen, params_p->user, 
          params_p->dbnamelen, params_p->dbname);
       
  if (char_csid && nchar_csid)
    printf ("using char csid=%d and nchar csid=%d", char_csid, nchar_csid);
 
  printf("\n");
 
  ocip = (oci_t *)malloc(sizeof(oci_t));
 
  if (OCIEnvNlsCreate(&ocip->envp, OCI_OBJECT, (dvoid *)0,
                     (dvoid * (*)(dvoid *, size_t)) 0,
                     (dvoid * (*)(dvoid *, dvoid *, size_t))0,
                     (void (*)(dvoid *, dvoid *)) 0,
                     (size_t) 0, (dvoid **) 0, char_csid, nchar_csid))
  {
    ocierror(ocip, (char *)"OCIEnvCreate() failed");
  }
 
  if (OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->errp,
                     (ub4) OCI_HTYPE_ERROR, (size_t) 0, (dvoid **) 0))
  {
    ocierror(ocip, (char *)"OCIHandleAlloc(OCI_HTYPE_ERROR) failed");
  }
 
  /* Logon to database */
  OCICALL(ocip,
          OCILogon(ocip->envp, ocip->errp, &ocip->svcp,
                   params_p->user, params_p->userlen,
                   params_p->passw, params_p->passwlen,
                   params_p->dbname, params_p->dbnamelen));
 
  /* allocate the server handle */
  OCICALL(ocip,
          OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->srvp,
                         OCI_HTYPE_SERVER, (size_t) 0, (dvoid **) 0));
 
  OCICALL(ocip, 
          OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->stmtp,
                     (ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0));
 
  if (*ociptr == (oci_t *)NULL)
  {
    *ociptr = ocip;
  }
}
 
/*---------------------------------------------------------------------
 * get_db_charsets - Get the database CHAR and NCHAR character set ids.
 *---------------------------------------------------------------------*/
static const oratext GET_DB_CHARSETS[] =  \
 "select parameter, value from nls_database_parameters where parameter = \
 'NLS_CHARACTERSET' or parameter = 'NLS_NCHAR_CHARACTERSET'";
 
#define PARM_BUFLEN      (30)
 
static void get_db_charsets(conn_info_t *params_p, ub2 *char_csid, 
                            ub2 *nchar_csid)
{
  OCIDefine  *defnp1 = (OCIDefine *) NULL;
  OCIDefine  *defnp2 = (OCIDefine *) NULL;
  oratext     parm[PARM_BUFLEN];
  oratext     value[OCI_NLS_MAXBUFSZ];
  ub2         parm_len = 0;
  ub2         value_len = 0;
  oci_t       ocistruct; 
  oci_t      *ocip = &ocistruct;
   
  *char_csid = 0;
  *nchar_csid = 0;
  memset (ocip, 0, sizeof(ocistruct));
 
  if (OCIEnvCreate(&ocip->envp, OCI_OBJECT, (dvoid *)0,
                     (dvoid * (*)(dvoid *, size_t)) 0,
                     (dvoid * (*)(dvoid *, dvoid *, size_t))0,
                     (void (*)(dvoid *, dvoid *)) 0,
                     (size_t) 0, (dvoid **) 0))
  {
    ocierror(ocip, (char *)"OCIEnvCreate() failed");
  }
 
  if (OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->errp,
                     (ub4) OCI_HTYPE_ERROR, (size_t) 0, (dvoid **) 0))
  {
    ocierror(ocip, (char *)"OCIHandleAlloc(OCI_HTYPE_ERROR) failed");
  }
 
  OCICALL(ocip, 
          OCILogon(ocip->envp, ocip->errp, &ocip->svcp,
                   params_p->user, params_p->userlen,
                   params_p->passw, params_p->passwlen,
                   params_p->dbname, params_p->dbnamelen));
 
  OCICALL(ocip, 
          OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->stmtp,
                     (ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0));
 
  /* Execute stmt to select the db nls char and nchar character set */ 
  OCICALL(ocip, 
          OCIStmtPrepare(ocip->stmtp, ocip->errp,
                         (CONST text *)GET_DB_CHARSETS,
                         (ub4)strlen((char *)GET_DB_CHARSETS),
                         (ub4)OCI_NTV_SYNTAX, (ub4)OCI_DEFAULT));
 
  OCICALL(ocip,
          OCIDefineByPos(ocip->stmtp, &defnp1,
                         ocip->errp, (ub4) 1, parm,
                         PARM_BUFLEN, SQLT_CHR, (void*) 0,
                         &parm_len, (ub2 *)0, OCI_DEFAULT));
 
  OCICALL(ocip,
          OCIDefineByPos(ocip->stmtp, &defnp2,
                         ocip->errp, (ub4) 2, value,
                         OCI_NLS_MAXBUFSZ, SQLT_CHR, (void*) 0,
                         &value_len, (ub2 *)0, OCI_DEFAULT));
 
  OCICALL(ocip, 
          OCIStmtExecute(ocip->svcp, ocip->stmtp, 
                         ocip->errp, (ub4)0, (ub4)0, 
                         (const OCISnapshot *)0,
                         (OCISnapshot *)0, (ub4)OCI_DEFAULT));
 
  while (OCIStmtFetch(ocip->stmtp, ocip->errp, 1,
                      OCI_FETCH_NEXT, OCI_DEFAULT) == OCI_SUCCESS)
  {
    value[value_len] = '\0';
    if (parm_len == strlen("NLS_CHARACTERSET") &&
        !memcmp(parm, "NLS_CHARACTERSET", parm_len))
    {
      *char_csid = OCINlsCharSetNameToId(ocip->envp, value);
      printf("Outbound database NLS_CHARACTERSET = %.*s (csid = %d) \n",
             value_len, value, *char_csid);
    }
    else if (parm_len == strlen("NLS_NCHAR_CHARACTERSET") &&
             !memcmp(parm, "NLS_NCHAR_CHARACTERSET", parm_len))
    {
      *nchar_csid = OCINlsCharSetNameToId(ocip->envp, value);
      printf("Outbound database NLS_NCHAR_CHARACTERSET = %.*s (csid = %d) \n",
             value_len, value, *nchar_csid);
    }
  }
 
  disconnect_db(ocip);
}
 
/*---------------------------------------------------------------------
 * attach - Attach to XStream server specified in connection info
 *---------------------------------------------------------------------*/
static void attach(oci_t * ocip, conn_info_t *conn, boolean outbound)
{
  sword       err;
 
  printf ("Attach to XStream %s server '%.*s'\n", 
          outbound ? "outbound" : "inbound",
          conn->svrnmlen, conn->svrnm);
 
  if (outbound)
  {
    OCICALL(ocip, 
            OCIXStreamOutAttach(ocip->svcp, ocip->errp, conn->svrnm,
                              (ub2)conn->svrnmlen, (ub1 *)0, 0, OCI_DEFAULT));
  }
  else
  {
    OCICALL(ocip, 
            OCIXStreamInAttach(ocip->svcp, ocip->errp, conn->svrnm,
                               (ub2)conn->svrnmlen, 
                               (oratext *)"From_XOUT", 9,
                               (ub1 *)0, 0, OCI_DEFAULT));
  }
 
  ocip->attached = TRUE;
  ocip->outbound = outbound;
}
 
/*---------------------------------------------------------------------
 * ping_svr - Ping inbound server by sending a commit LCR.
 *---------------------------------------------------------------------*/
static void ping_svr(oci_t *xin_ocip, void *commit_lcr,
                     ub1 *cmtpos, ub2 cmtpos_len, 
                     oratext *source_db, ub2 source_db_len)
{
  OCIDate     src_time;
  oratext     txid[128];
 
  OCICALL(xin_ocip, OCIDateSysDate(xin_ocip->errp, &src_time));
  sprintf((char *)txid, "Ping %2d:%2d:%2d",
          src_time.OCIDateTime.OCITimeHH,
          src_time.OCIDateTime.OCITimeMI,
          src_time.OCIDateTime.OCITimeSS);
 
  /* Initialize LCR with new txid and commit position */
  OCICALL(xin_ocip,
          OCILCRHeaderSet(xin_ocip->svcp, xin_ocip->errp,
                          source_db, source_db_len,
                          (oratext *)OCI_LCR_ROW_CMD_COMMIT,
                          (ub2)strlen(OCI_LCR_ROW_CMD_COMMIT),
                          (oratext *)0, 0,                     /* null owner */
                          (oratext *)0, 0,                    /* null object */
                          (ub1 *)0, 0,                           /* null tag */
                          txid, (ub2)strlen((char *)txid),
                          &src_time, cmtpos, cmtpos_len,
                          0, commit_lcr, OCI_DEFAULT));
 
  /* Send commit lcr to inbound server. */
  if (OCIXStreamInLCRSend(xin_ocip->svcp, xin_ocip->errp, commit_lcr,
                          OCI_LCR_XROW, 0, OCI_DEFAULT) == OCI_ERROR)
  {
    ocierror(xin_ocip, (char *)"OCIXStreamInLCRSend failed in ping_svr()");
  }
}
 
/*---------------------------------------------------------------------
 * get_lcrs - Get LCRs from outbound server and send to inbound server.
 *---------------------------------------------------------------------*/
static void get_lcrs(oci_t *xin_ocip, oci_t *xout_ocip)
{
  sword       status = OCI_SUCCESS;
  void       *lcr;
  ub1         lcrtype;
  oraub8      flag;
  ub1         proclwm[OCI_LCR_MAX_POSITION_LEN];
  ub2         proclwm_len = 0;
  ub1         sv_pingpos[OCI_LCR_MAX_POSITION_LEN];
  ub2         sv_pingpos_len = 0;
  ub1         fetchlwm[OCI_LCR_MAX_POSITION_LEN];
  ub2         fetchlwm_len = 0;
  void       *commit_lcr = (void *)0;
  oratext    *lcr_srcdb = (oratext *)0;
  ub2         lcr_srcdb_len = 0;
  oratext     source_db[M_DBNAME_LEN];
  ub2         source_db_len = 0;
  ub4         lcrcnt = 0;
 
  /* create an lcr to ping the inbound server periodically by sending a
   * commit lcr.
   */
  commit_lcr = (void*)0;
  OCICALL(xin_ocip,
          OCILCRNew(xin_ocip->svcp, xin_ocip->errp, OCI_DURATION_SESSION,
                    OCI_LCR_XROW, &commit_lcr, OCI_DEFAULT));
 
  while (status == OCI_SUCCESS)
  {
    lcrcnt = 0;                         /* reset lcr count before each batch */
 
    while ((status = 
                OCIXStreamOutLCRReceive(xout_ocip->svcp, xout_ocip->errp,
                                        &lcr, &lcrtype, &flag, 
                                        fetchlwm, &fetchlwm_len, OCI_DEFAULT))
                                               == OCI_STILL_EXECUTING)
    {
      lcrcnt++;
 
      /* print header of LCR just received */
      print_lcr(xout_ocip, lcr, lcrtype, &lcr_srcdb, &lcr_srcdb_len);
 
      /* save the source db to construct ping lcr later */
      if (!source_db_len && lcr_srcdb_len)
      {
        memcpy(source_db, lcr_srcdb, lcr_srcdb_len);
        source_db_len = lcr_srcdb_len;
      }
      
      /* send the LCR just received */
      if (OCIXStreamInLCRSend(xin_ocip->svcp, xin_ocip->errp, 
                              lcr, lcrtype, flag, OCI_DEFAULT) == OCI_ERROR)
      {
        ocierror(xin_ocip, (char *)"OCIXStreamInLCRSend failed");
      }
 
      /* If LCR has chunked columns (i.e, has LOB/Long/XMLType columns) */
      if (flag & OCI_XSTREAM_MORE_ROW_DATA)
      {
        /* receive and send chunked columns */
        get_chunks(xin_ocip, xout_ocip); 
      }
    }
 
    if (status == OCI_ERROR)
      ocierror(xout_ocip, (char *)"OCIXStreamOutLCRReceive failed");
 
    /* clear the saved ping position if we just received some new lcrs */
    if (lcrcnt)
    {
      sv_pingpos_len = 0;
    }
 
    /* If no lcrs received during previous WHILE loop and got a new fetch 
     * LWM then send a commit lcr to ping the inbound server with the new
     * fetch LWM position.
     */
    else if (fetchlwm_len > 0 && source_db_len > 0 &&
        (fetchlwm_len != sv_pingpos_len ||
         memcmp(sv_pingpos, fetchlwm, fetchlwm_len))) 
    {
      /* To ensure we don't send multiple lcrs with duplicate position, send
       * a new ping only if we have saved the last ping position.
       */
      if (sv_pingpos_len > 0)
      {     
        ping_svr(xin_ocip, commit_lcr, fetchlwm, fetchlwm_len,
                 source_db, source_db_len); 
      }
 
      /* save the position just sent to inbound server */
      memcpy(sv_pingpos, fetchlwm, fetchlwm_len);
      sv_pingpos_len = fetchlwm_len;
    }
 
    /* flush inbound network to flush all lcrs to inbound server */
    OCICALL(xin_ocip,
            OCIXStreamInFlush(xin_ocip->svcp, xin_ocip->errp, OCI_DEFAULT));
 
    
    /* get processed LWM of inbound server */   
    OCICALL(xin_ocip, 
            OCIXStreamInProcessedLWMGet(xin_ocip->svcp, xin_ocip->errp,
                                        proclwm, &proclwm_len, OCI_DEFAULT));
 
    if (proclwm_len > 0)
    {
      /* Set processed LWM for outbound server */
      OCICALL(xout_ocip, 
              OCIXStreamOutProcessedLWMSet(xout_ocip->svcp, xout_ocip->errp, 
                                           proclwm, proclwm_len, OCI_DEFAULT));
    }
  }
  
  if (status != OCI_SUCCESS)
    ocierror(xout_ocip, (char *)"get_lcrs() encounters error");
}
 
/*---------------------------------------------------------------------
 * get_chunks - Get each chunk for the current LCR and send it to 
 *              the inbound server.
 *---------------------------------------------------------------------*/
static void get_chunks(oci_t *xin_ocip, oci_t *xout_ocip)
{
  oratext *colname;
  ub2      colname_len;
  ub2      coldty;
  oraub8   col_flags;
  ub2      col_csid;
  ub4      chunk_len;
  ub1     *chunk_ptr;
  oraub8   row_flag;
  sword    err;
  sb4      rtncode;
 
  do
  {
    /* Get a chunk from outbound server */
    OCICALL(xout_ocip,
            OCIXStreamOutChunkReceive(xout_ocip->svcp, xout_ocip->errp, 
                                      &colname, &colname_len, &coldty, 
                                      &col_flags, &col_csid, &chunk_len, 
                                      &chunk_ptr, &row_flag, OCI_DEFAULT));
   
    /* print chunked column info */
    printf(
     "  Chunked column name=%.*s DTY=%d  chunk len=%d csid=%d col_flag=0x%lx\n",
      colname_len, colname, coldty, chunk_len, col_csid, col_flags);
 
    /* print chunk data */
    print_chunk(chunk_ptr, chunk_len, coldty);
 
    /* Send the chunk just received to inbound server */
    OCICALL(xin_ocip,
            OCIXStreamInChunkSend(xin_ocip->svcp, xin_ocip->errp, colname,
                                  colname_len, coldty, col_flags,
                                  col_csid, chunk_len, chunk_ptr,
                                  row_flag, OCI_DEFAULT));
 
  } while (row_flag & OCI_XSTREAM_MORE_ROW_DATA);
}
 
/*---------------------------------------------------------------------
 * print_chunk - Print chunked column information. Only print the first
 *               50 bytes for each chunk.
 *---------------------------------------------------------------------*/
static void print_chunk (ub1 *chunk_ptr, ub4 chunk_len, ub2 dty)
{
#define MAX_PRINT_BYTES     (50)          /* print max of 50 bytes per chunk */
 
  ub4  print_bytes;
 
  if (chunk_len == 0)
    return;
 
  print_bytes = chunk_len > MAX_PRINT_BYTES ? MAX_PRINT_BYTES : chunk_len;
 
  printf("  Data = ");
  if (dty == SQLT_CHR)
    printf("%.*s", print_bytes, chunk_ptr);
  else
  {
    ub2  idx;
 
    for (idx = 0; idx < print_bytes; idx++)
      printf("%02x", chunk_ptr[idx]);
  }
  printf("\n");
}
 
/*---------------------------------------------------------------------
 * print_lcr - Print header information of given lcr.
 *---------------------------------------------------------------------*/
static void print_lcr(oci_t *ocip, void *lcrp, ub1 lcrtype, 
                      oratext **src_db_name, ub2  *src_db_namel)
{
  oratext     *cmd_type;
  ub2          cmd_type_len;
  oratext     *owner;
  ub2          ownerl;
  oratext     *oname;
  ub2          onamel;
  oratext     *txid;
  ub2          txidl;
  sword        ret;
 
  printf("\n ----------- %s LCR Header  -----------------\n",
         lcrtype == OCI_LCR_XDDL ? "DDL" : "ROW");
 
  /* Get LCR Header information */
  ret = OCILCRHeaderGet(ocip->svcp, ocip->errp, 
                        src_db_name, src_db_namel,              /* source db */
                        &cmd_type, &cmd_type_len,            /* command type */
                        &owner, &ownerl,                       /* owner name */
                        &oname, &onamel,                      /* object name */
                        (ub1 **)0, (ub2 *)0,                      /* lcr tag */
                        &txid, &txidl, (OCIDate *)0,   /* txn id  & src time */
                        (ub2 *)0, (ub2 *)0,              /* OLD/NEW col cnts */
                        (ub1 **)0, (ub2 *)0,                 /* LCR position */
                        (oraub8*)0, lcrp, OCI_DEFAULT);
 
  if (ret != OCI_SUCCESS)
    ocierror(ocip, (char *)"OCILCRHeaderGet failed");
  else
  {
    printf("  src_db_name=%.*s\n  cmd_type=%.*s txid=%.*s\n",
           *src_db_namel, *src_db_name, cmd_type_len, cmd_type, txidl, txid );
 
    if (ownerl > 0)
      printf("  owner=%.*s oname=%.*s \n", ownerl, owner, onamel, oname);
  } 
}
 
/*---------------------------------------------------------------------
 * detach - Detach from XStream server
 *---------------------------------------------------------------------*/
static void detach(oci_t * ocip)
{
  sword  err = OCI_SUCCESS;
 
  printf ("Detach from XStream %s server\n",
          ocip->outbound ? "outbound" : "inbound" );
 
  if (ocip->outbound)
  {
    OCICALL(ocip, OCIXStreamOutDetach(ocip->svcp, ocip->errp, OCI_DEFAULT));
  }
  else
  {
    OCICALL(ocip, OCIXStreamInDetach(ocip->svcp, ocip->errp, 
                                     (ub1 *)0, (ub2 *)0,    /* processed LWM */
                                     OCI_DEFAULT));
  }
}
 
/*---------------------------------------------------------------------
 * disconnect_db  - Logoff from the database
 *---------------------------------------------------------------------*/
static void disconnect_db(oci_t * ocip)
{
  if (OCILogoff(ocip->svcp, ocip->errp))
  {
    ocierror(ocip, (char *)"OCILogoff() failed");
  }
 
  if (ocip->errp)
    OCIHandleFree((dvoid *) ocip->errp, (ub4) OCI_HTYPE_ERROR);
 
  if (ocip->envp)
    OCIHandleFree((dvoid *) ocip->envp, (ub4) OCI_HTYPE_ENV);
}
 
/*---------------------------------------------------------------------
 * ocierror - Print error status and exit program
 *---------------------------------------------------------------------*/
static void ocierror(oci_t * ocip, char * msg)
{
  sb4 errcode=0;
  text bufp[4096];
 
  if (ocip->errp)
  {
    OCIErrorGet((dvoid *) ocip->errp, (ub4) 1, (text *) NULL, &errcode,
                bufp, (ub4) 4096, (ub4) OCI_HTYPE_ERROR);
    printf("%s\n%s", msg, bufp);
  }
  else
    puts(msg);
 
  printf ("\n");
  exit(1);
}
 
/*--------------------------------------------------------------------
 * print_usage - Print command usage
 *---------------------------------------------------------------------*/
static void print_usage(int exitcode)
{
  puts("\nUsage: xio -ob_svr <outbound_svr> -ob_db <outbound_db>\n" 
         "           -ob_usr <conn_user> -ob_pwd <conn_user_pwd>\n" 
         "           -ib_svr <inbound_svr> -ib_db <inbound_db>\n"
         "           -ib_usr <apply_user> -ib_pwd <apply_user_pwd>\n");
  puts("  ob_svr  : outbound server name\n"
       "  ob_db   : database name of outbound server\n"
       "  ob_usr  : connect user to outbound server\n"
       "  ob_pwd  : password of outbound's connect user\n"
       "  ib_svr  : inbound server name\n"
       "  ib_db   : database name of inbound server\n"
       "  ib_usr  : apply user for inbound server\n"
       "  ib_pwd  : password of inbound's apply user\n");
 
  exit(exitcode);
}
 
/*--------------------------------------------------------------------
 * get_inputs - Get user inputs from command line
 *---------------------------------------------------------------------*/
static void get_inputs(conn_info_t *xout_params, conn_info_t *xin_params, 
                       int argc, char ** argv)
{
  char * option;
  char * value;
 
  memset (xout_params, 0, sizeof(*xout_params));
  memset (xin_params, 0, sizeof(*xin_params));
  while(--argc)
  {
    /* get the option name */
    argv++;
    option = *argv;
 
    /* check that the option begins with a "-" */
    if (!strncmp(option, (char *)"-", 1))
    {
      option ++;
    }
    else
    {
      printf("Error: bad argument '%s'\n", option);
      print_usage(1);
    }
 
    /* get the value of the option */
    --argc;
    argv++;
 
    value = *argv;    
 
    if (!strncmp(option, (char *)"ob_db", 5))
    {
      xout_params->dbname = (oratext *)value;
      xout_params->dbnamelen = (ub4)strlen(value);
    }
    else if (!strncmp(option, (char *)"ob_usr", 6))
    {
      xout_params->user = (oratext *)value;
      xout_params->userlen = (ub4)strlen(value);
    }
    else if (!strncmp(option, (char *)"ob_pwd", 6))
    {
      xout_params->passw = (oratext *)value;
      xout_params->passwlen = (ub4)strlen(value);
    }
    else if (!strncmp(option, (char *)"ob_svr", 6))
    {
      xout_params->svrnm = (oratext *)value;
      xout_params->svrnmlen = (ub4)strlen(value);
    }
    else if (!strncmp(option, (char *)"ib_db", 5))
    {
      xin_params->dbname = (oratext *)value;
      xin_params->dbnamelen = (ub4)strlen(value);
    }
    else if (!strncmp(option, (char *)"ib_usr", 6))
    {
      xin_params->user = (oratext *)value;
      xin_params->userlen = (ub4)strlen(value);
    }
    else if (!strncmp(option, (char *)"ib_pwd", 6))
    {
      xin_params->passw = (oratext *)value;
      xin_params->passwlen = (ub4)strlen(value);
    }
    else if (!strncmp(option, (char *)"ib_svr", 6))
    {
      xin_params->svrnm = (oratext *)value;
      xin_params->svrnmlen = (ub4)strlen(value);
    }
    else
    {
      printf("Error: unknown option '%s'.\n", option);
      print_usage(1);
    }
  }
 
  /* print usage and exit if any argument is not specified */
  if (!xout_params->svrnmlen || !xout_params->passwlen || 
      !xout_params->userlen || !xout_params->dbnamelen ||
      !xin_params->svrnmlen || !xin_params->passwlen || 
      !xin_params->userlen || !xin_params->dbnamelen)
  {
    printf("Error: missing command arguments. \n");
    print_usage(1);
  }
}

A.3 Java APIのサンプルXStreamクライアント・アプリケーション

Java APIのサンプルXStreamクライアント・アプリケーションを実行するには、アプリケーション・ファイルをコンパイルおよびリンクします。

次に、コマンドラインに次のコマンドを入力します。

java xio xsin_oraclesid xsin_host xsin_port xsin_username 
xsin_passwd xin_servername xsout_oraclesid xsout_host xsout_port 
xsout_username xsout_passwd xsout_servername

次のプレースホルダに適切な値を代入します。

  • xsin_oraclesidはインバウンド・サーバーのデータベースのOracle SIDです。

  • xsin_hostはインバウンド・サーバーを実行するコンピュータ・システムのホスト名です。

  • xsin_portはインバウンド・サーバーのデータベースのリスナーのポート番号です。

  • xsin_usernameはインバウンド・サーバーの適用ユーザーです。

  • xsin_passwdはインバウンド・サーバーの適用ユーザーのパスワードです。

  • xin_servernameはインバウンド・サーバーの名前です。

  • xsout_oraclesidはアウトバウンド・サーバーのデータベースのOracle SIDです。

  • xsout_hostはアウトバウンド・サーバーを実行するコンピュータ・システムのホスト名です。

  • xsout_portはアウトバウンド・サーバーのデータベースのリスナーのポート番号です。

  • xsout_usernameはアウトバウンド・サーバーの接続ユーザーです。

  • xsout_passwdはアウトバウンド・サーバーの接続ユーザーのパスワードです。

  • xsout_servernameはアウトバウンド・サーバーの名前です。

サンプル・クライアント・アプリケーションの実行中、インバウンド・サーバーおよびアウトバウンド・サーバーへの接続の詳細情報が、各サーバーの最終位置とともに出力されます。出力は、次のようになります。

xsin_host = server2.example.com
xsin_port = 1482
xsin_ora_sid = db2
xsin connection url: jdbc:oracle:oci:@server2.example.com:1482:db2
xsout_host = server1.example.com
xsout_port = 1481
xsout_ora_sid = db1
xsout connection url: jdbc:oracle:oci:@server1.example.com:1481:db1
Attached to inbound server:xin
Inbound Server Last Position is: 0000000920250000000100000001000000092025000000010000000101
Attached to outbound server:xout
Last Position is: 0000000920250000000100000001000000092025000000010000000101

このデモは、次の場所にあります。

$ORACLE_HOME/rdbms/demo/xstream/java

デモ・ファイルの名前は、xio.javaです。アプリケーションのコンパイルと実行の詳細は、demoディレクトリにあるREADME.txtファイルを参照してください。

Java APIを使用するサンプル・アプリケーションのコードは、次のとおりです。

import oracle.streams.*;
import oracle.jdbc.internal.OracleConnection;
import oracle.jdbc.*;
import oracle.sql.*;
import java.sql.*;
import java.util.*;
 
public class xio
{
  public static String xsinusername = null;
  public static String xsinpasswd = null;
  public static String xsinName = null;
  public static String xsoutusername = null;
  public static String xsoutpasswd = null;
  public static String xsoutName = null;
  public static String in_url = null;
  public static String out_url = null;
  public static Connection in_conn = null;
  public static Connection out_conn = null;
  public static XStreamIn xsIn = null;
  public static XStreamOut xsOut = null;
  public static byte[] lastPosition = null;
  public static byte[] processedLowPosition = null;
    
  public static void main(String args[])
  {
    // get connection url to inbound and outbound server
    in_url = parseXSInArguments(args);    
    out_url = parseXSOutArguments(args);    
 
    // create connection to inbound and outbound server
    in_conn = createConnection(in_url, xsinusername, xsinpasswd);
    out_conn = createConnection(out_url, xsoutusername, xsoutpasswd);
 
    // attach to inbound and outbound server
    xsIn = attachInbound(in_conn);
    xsOut = attachOutbound(out_conn);
    
    // main loop to get lcrs 
    get_lcrs(xsIn, xsOut);
    
    // detach from inbound and outbound server
    detachInbound(xsIn);
    detachOutbound(xsOut);
  }
    
  // parse the arguments to get the conncetion url to inbound db
  public static String parseXSInArguments(String args[])
  {
    String trace, pref;
    String orasid, host, port;
    
    if (args.length != 12)
    {
      printUsage();
      System.exit(0);
    }
 
    orasid = args[0];
    host = args[1];
    port = args[2];
    xsinusername = args[3];
    xsinpasswd = args[4];
    xsinName = args[5];
    
    System.out.println("xsin_host = "+host);
    System.out.println("xsin_port = "+port);
    System.out.println("xsin_ora_sid = "+orasid);
 
    String in_url = "jdbc:oracle:oci:@"+host+":"+port+":"+orasid;
    System.out.println("xsin connection url: "+ in_url);
 
    return in_url;
  }
 
  // parse the arguments to get the conncetion url to outbound db
  public static String parseXSOutArguments(String args[])
  {
    String trace, pref;
    String orasid, host, port;
    
    if (args.length != 12)
    {
      printUsage();
      System.exit(0);
    }
 
    orasid = args[6];
    host = args[7];
    port = args[8];
    xsoutusername = args[9];
    xsoutpasswd = args[10];
    xsoutName = args[11];
    
    
    System.out.println("xsout_host = "+host);
    System.out.println("xsout_port = "+port);
    System.out.println("xsout_ora_sid = "+orasid);
 
    String out_url = "jdbc:oracle:oci:@"+host+":"+port+":"+orasid;
    System.out.println("xsout connection url: "+ out_url);
 
    return out_url;
  }
 
  // print out sample program usage message
  public static void printUsage()
  {
    System.out.println("");      
    System.out.println("Usage: java xio "+"<xsin_oraclesid> " + "<xsin_host> "
                                         + "<xsin_port> ");
    System.out.println("                "+"<xsin_username> " + "<xsin_passwd> "
                                         + "<xsin_servername> ");
    System.out.println("                "+"<xsout_oraclesid> " + "<xsout_host> "
                                         + "<xsout_port> ");
    System.out.println("                "+"<xsout_username> " + "<xsout_passwd> "
                                         + "<xsout_servername> ");
  }
 
  // create a connection to an Oracle Database
  public static Connection createConnection(String url, 
                                            String username, 
                                            String passwd)
  {
    try
    {
      DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
      return DriverManager.getConnection(url, username, passwd);
    }
    catch(Exception e)
    {
      System.out.println("fail to establish DB connection to: " +url);
      e.printStackTrace();
      return null;
    }
  }
 
  // attach to the XStream Inbound Server
  public static XStreamIn attachInbound(Connection in_conn)
  {
    XStreamIn xsIn = null;
    try
    {
      xsIn = XStreamIn.attach((OracleConnection)in_conn, xsinName,
                              "XSDEMOINCLIENT" , XStreamIn.DEFAULT_MODE);
 
      // use last position to decide where should we start sending LCRs  
      lastPosition = xsIn.getLastPosition();
      System.out.println("Attached to inbound server:"+xsinName);
      System.out.print("Inbound Server Last Position is: ");
      if (null == lastPosition)
      {
        System.out.println("null");
      }
      else
      {
        printHex(lastPosition);
      }
      return xsIn;
    }
    catch(Exception e)
    {
      System.out.println("cannot attach to inbound server: "+xsinName);
      System.out.println(e.getMessage());
      e.printStackTrace();
      return null;
    }        
  }
 
  // attach to the XStream Outbound Server    
  public static XStreamOut attachOutbound(Connection out_conn)
  {
    XStreamOut xsOut = null;
 
    try
    {
      // when attach to an outbound server, client needs to tell outbound
      // server the last position.
      xsOut = XStreamOut.attach((OracleConnection)out_conn, xsoutName,
                                lastPosition, XStreamOut.DEFAULT_MODE);
      System.out.println("Attached to outbound server:"+xsoutName);
      System.out.print("Last Position is: ");  
      if (lastPosition != null)
      {
        printHex(lastPosition);
      }
      else
      {
        System.out.println("NULL");
      }
      return xsOut;
    }
    catch(Exception e)
    {
      System.out.println("cannot attach to outbound server: "+xsoutName);
      System.out.println(e.getMessage());
      e.printStackTrace();
      return null;
    } 
  }
 
  // detach from the XStream Inbound Server
  public static void detachInbound(XStreamIn xsIn)
  {
    byte[] processedLowPosition = null;
    try
    {
      processedLowPosition = xsIn.detach(XStreamIn.DEFAULT_MODE);
      System.out.print("Inbound server processed low Position is: ");
      if (processedLowPosition != null)
      {
        printHex(processedLowPosition);
      }
      else
      {
        System.out.println("NULL");
      }
    }
    catch(Exception e)
    {
      System.out.println("cannot detach from the inbound server: "+xsinName);
      System.out.println(e.getMessage());
      e.printStackTrace();
    }
  }
 
  // detach from the XStream Outbound Server    
  public static void detachOutbound(XStreamOut xsOut)
  {
    try
    {
      xsOut.detach(XStreamOut.DEFAULT_MODE);
    }
    catch(Exception e)
    {
      System.out.println("cannot detach from the outbound server: "+xsoutName);
      System.out.println(e.getMessage());
      e.printStackTrace();
    }       
  }
 
  public static void get_lcrs(XStreamIn xsIn, XStreamOut xsOut)
  {
    if (null == xsIn) 
    {
      System.out.println("xstreamIn is null");
      System.exit(0);
    }
 
    if (null == xsOut)
    {
      System.out.println("xstreamOut is null");
      System.exit(0);
    }
 
    try
    {
      while(true) 
      {
        // receive an LCR from outbound server
        LCR alcr = xsOut.receiveLCR(XStreamOut.DEFAULT_MODE);
 
        if (xsOut.getBatchStatus() == XStreamOut.EXECUTING) // batch is active
        {
          assert alcr != null;
          // send the LCR to the inbound server
          xsIn.sendLCR(alcr, XStreamIn.DEFAULT_MODE);
 
          // also get chunk data for this LCR if any
          if (alcr instanceof RowLCR)
          {
            // receive chunk from outbound then send to inbound
            if (((RowLCR)alcr).hasChunkData())
            {
              ChunkColumnValue chunk = null; 
              do
              {
                chunk = xsOut.receiveChunk(XStreamOut.DEFAULT_MODE);
                xsIn.sendChunk(chunk, XStreamIn.DEFAULT_MODE);
              } while (!chunk.isEndOfRow());
            }
          }
          processedLowPosition = alcr.getPosition();
        }
        else  // batch is end 
        {
          assert alcr == null;
          // flush the network
          xsIn.flush(XStreamIn.DEFAULT_MODE);
          // get the processed_low_position from inbound server
          processedLowPosition = 
              xsIn.getProcessedLowWatermark();
          // update the processed_low_position at oubound server
          if (null != processedLowPosition)
            xsOut.setProcessedLowWatermark(processedLowPosition, 
                                           XStreamOut.DEFAULT_MODE);
        }
      }
    }
    catch(Exception e)
    {
      System.out.println("exception when processing LCRs");
      System.out.println(e.getMessage());
      e.printStackTrace();
    }
  }
 
  public static void printHex(byte[] b) 
  {
    for (int i = 0; i < b.length; ++i) 
    {
      System.out.print(
        Integer.toHexString((b[i]&0xFF) | 0x100).substring(1,3));
    }
    System.out.println("");
  }    
}