A サンプルXStreamクライアント・アプリケーション
XStreamによって使用されるOracle Databaseコンポーネントを構成する方法を例で示します。この例では、XStreamアウトバウンド・サーバーおよびインバウンド・サーバーと通信するサンプル・クライアント・アプリケーションを構成します。
- サンプルXStreamクライアント・アプリケーションについて
サンプルXStreamクライアント・アプリケーションでは、XStream OutおよびXStream Inアプリケーションで必要な基本的なタスクを説明します。 - Oracle Call Interface APIのサンプルXStreamクライアント・アプリケーション
OCI APIのサンプルXStreamクライアント・アプリケーションを実行するには、アプリケーション・ファイルをコンパイルおよびリンクします。 - Java APIのサンプルXStreamクライアント・アプリケーション
Java APIのサンプルXStreamクライアント・アプリケーションを実行するには、アプリケーション・ファイルをコンパイルおよびリンクします。
A.1 サンプルXStreamクライアント・アプリケーションについて
サンプルXStreamクライアント・アプリケーションでは、XStream OutおよびXStream Inアプリケーションで必要な基本的なタスクを説明します。
このアプリケーションは、次のタスクを実行します。
-
XStreamアウトバウンド・サーバーおよびインバウンド・サーバーに接続して、アウトバウンド・サーバーからのLCRを待機します。アウトバウンド・サーバーおよびインバウンド・サーバーは、2つの異なるデータベース内にあります。
-
アウトバウンド・サーバーからLCRを受信すると、すぐにLCRをインバウンド・サーバーに送信します。
-
定期的にインバウンド・サーバーから処理済最低位置を取得し、この値をアウトバウンド・サーバーに送信します。
-
定期的にアウトバウンド・サーバーからインバウンド・サーバーに「ping」LCRを送信し、アクティビティが少ないときに、インバウンド・サーバーの処理済最低位置を前方に移動します。
LCRをインバウンド・サーバーに送信しないXStream Out構成では、クライアント・アプリケーションは別の方法で処理済最低位置を取得する必要があります。
このアプリケーションは、アウトバウンド・サーバーからのトランザクションを無期限に待機します。アプリケーションを中断するには、オペレーティング・システムの割込みコマンドを入力します。たとえば、一部のオペレーティング・システムの割込みコマンドは、control-C
です。プログラムを再起動すると、アウトバウンド・サーバーでは、前回の実行時に設定された処理済最低位置からLCRの送信を開始します。
図A-1はこの項で構成されたXStream環境の概要を示します。
サンプル・アプリケーションを実行する前に、次が構成されていることを確認してください。
-
2つのOracleデータベースとそれらの間のネットワーク接続
-
両方のデータベースでのXStream管理者
-
1つのデータベースでのアウトバウンド・サーバーの構成(取得プロセス、キューおよびアウトバウンド・サーバーを含む)
-
別のデータベースでのインバウンド・サーバーの構成
マルチテナント・コンテナ・データベース(CDB)でサンプル・アプリケーションを実行している場合、クライアント・アプリケーションが正しいコンテナに接続していることを確認してください。
-
クライアント・アプリケーションがアウトバウンド・サーバーに接続する場合は、ルートに接続する必要があります。
-
クライアント・アプリケーションがインバウンド・サーバーに接続する場合は、インバウンド・サーバーが作成されたコンテナに接続する必要があります。
以降の項に示すサンプル・アプリケーションでは、同じタスクを実行します。1つのサンプル・アプリケーションはOCI APIを使用し、もう1つはJava APIを使用します。
注意:
Oracle Databaseインストール環境には、複数のXStreamデモが含まれています。これらのデモは次の場所にあります。
$ORACLE_HOME/rdbms/demo/xstream
A.2 Oracle Call Interface APIのサンプルXStreamクライアント・アプリケーション
OCI APIのサンプルXStreamクライアント・アプリケーションを実行するには、アプリケーション・ファイルをコンパイルおよびリンクします。
次に、コマンドラインに次のコマンドを入力します。
xio -ob_svr xout_name -ob_db sn_xout_db -ob_usr xout_cu -ob_pwd xout_cu_pass -ib_svr xin_name -ib_db sn_xin_db -ib_usr xin_au -ib_pwd xin_au_pass
次のプレースホルダに適切な値を代入します。
-
xout_nameはアウトバウンド・サーバーの名前です。
-
sn_xout_dbはアウトバウンド・サーバーのデータベースのサービス名です。
-
xout_cuはアウトバウンド・サーバーの接続ユーザーです。
-
xout_cu_passはアウトバウンド・サーバーの接続ユーザーのパスワードです。
-
xin_nameはインバウンド・サーバーの名前です。
-
sn_xin_dbはインバウンド・サーバーのデータベースのサービス名です。
-
xin_auはインバウンド・サーバーの適用ユーザーです。
-
xin_au_passはインバウンド・サーバーの適用ユーザーのパスワードです。
サンプル・クライアント・アプリケーションの実行中は、処理中の行LCRに関する情報が出力されます。出力は、次のようになります。
----------- ROW LCR Header ----------------- src_db_name=DB.EXAMPLE.COM cmd_type=UPDATE txid=17.0.74 owner=HR oname=COUNTRIES ----------- ROW LCR Header ----------------- src_db_name=DB.EXAMPLE.COM cmd_type=COMMIT txid=17.0.74 ----------- ROW LCR Header ----------------- src_db_name=DB.EXAMPLE.COM cmd_type=UPDATE txid=12.25.77 owner=OE oname=ORDERS ----------- ROW LCR Header ----------------- src_db_name=DB.EXAMPLE.COM cmd_type=UPDATE txid=12.25.77 owner=OE oname=ORDERS
この出力には、各行LCRの次の情報が含まれています。
-
src_db_name
には行LCRにカプセル化された変更のソース・データベースが表示されます。 -
cmd_type
には変更を行ったSQL文のタイプが表示されます。 -
txid
には行LCRを含むトランザクションのトランザクションIDが表示されます。 -
owner
には変更されたデータベース・オブジェクトの所有者が表示されます。 -
oname
には変更されたデータベース・オブジェクトの名前が表示されます。
このデモは、次の場所にあります。
$ORACLE_HOME/rdbms/demo/xstream/oci
デモ・ファイルの名前は、xio.c
です。アプリケーションのコンパイルと実行の詳細は、demoディレクトリにあるREADME.txt
ファイルを参照してください。
OCI APIを使用するサンプル・アプリケーションのコードは、次のとおりです。
#ifndef OCI_ORACLE #include <oci.h> #endif #ifndef _STDIO_H #include <stdio.h> #endif #ifndef _STDLIB_H #include <stdlib.h> #endif #ifndef _STRING_H #include <string.h> #endif #ifndef _MALLOC_H #include <malloc.h> #endif /*---------------------------------------------------------------------- * Internal structures *----------------------------------------------------------------------*/ #define M_DBNAME_LEN (128) typedef struct conn_info /* connect info */ { oratext * user; ub4 userlen; oratext * passw; ub4 passwlen; oratext * dbname; ub4 dbnamelen; oratext * svrnm; ub4 svrnmlen; } conn_info_t; typedef struct params { conn_info_t xout; /* outbound info */ conn_info_t xin; /* inbound info */ } params_t; typedef struct oci /* OCI handles */ { OCIEnv *envp; /* Environment handle */ OCIError *errp; /* Error handle */ OCIServer *srvp; /* Server handle */ OCISvcCtx *svcp; /* Service handle */ OCISession *authp; OCIStmt *stmtp; boolean attached; boolean outbound; } oci_t; static void connect_db(conn_info_t *opt_params_p, oci_t ** ocip, ub2 char_csid, ub2 nchar_csid); static void disconnect_db(oci_t * ocip); static void ocierror(oci_t * ocip, char * msg); static void attach(oci_t * ocip, conn_info_t *conn, boolean outbound); static void detach(oci_t *ocip); static void get_lcrs(oci_t *xin_ocip, oci_t *xout_ocip); static void get_chunks(oci_t *xin_ocip, oci_t *xout_ocip); static void print_lcr(oci_t *ocip, void *lcrp, ub1 lcrtype, oratext **src_db_name, ub2 *src_db_namel); static void print_chunk (ub1 *chunk_ptr, ub4 chunk_len, ub2 dty); static void get_inputs(conn_info_t *xout_params, conn_info_t *xin_params, int argc, char ** argv); static void get_db_charsets(conn_info_t *params_p, ub2 *char_csid, ub2 *nchar_csid); static void set_client_charset(oci_t *outbound_ocip); #define OCICALL(ocip, function) do {\ sword status=function;\ if (OCI_SUCCESS==status) break;\ else if (OCI_ERROR==status) \ {ocierror(ocip, (char *)"OCI_ERROR");\ exit(1);}\ else {printf("Error encountered %d\n", status);\ exit(1);}\ } while(0) /*--------------------------------------------------------------------- * M A I N P R O G R A M *---------------------------------------------------------------------*/ main(int argc, char **argv) { /* Outbound and inbound connection info */ conn_info_t xout_params; conn_info_t xin_params; oci_t *xout_ocip = (oci_t *)NULL; oci_t *xin_ocip = (oci_t *)NULL; ub2 obdb_char_csid = 0; /* outbound db char csid */ ub2 obdb_nchar_csid = 0; /* outbound db nchar csid */ /* parse command line arguments */ get_inputs(&xout_params, &xin_params, argc, argv); /* Get the outbound database CHAR and NCHAR character set info */ get_db_charsets(&xout_params, &obdb_char_csid, &obdb_nchar_csid); /* Connect to the outbound db and set the client env to the outbound charsets * to minimize character conversion when transferring LCRs from outbound * directly to inbound server. */ connect_db(&xout_params, &xout_ocip, obdb_char_csid, obdb_nchar_csid); /* Attach to outbound server */ attach(xout_ocip, &xout_params, TRUE); /* connect to inbound db and set the client charsets the same as the * outbound db charsets. */ connect_db(&xin_params, &xin_ocip, obdb_char_csid, obdb_nchar_csid); /* Attach to inbound server */ attach(xin_ocip, &xin_params, FALSE); /* Get lcrs from outbound server and send to inbound server */ get_lcrs(xin_ocip, xout_ocip); /* Detach from XStream servers */ detach(xout_ocip); detach(xin_ocip); /* Disconnect from both databases */ disconnect_db(xout_ocip); disconnect_db(xin_ocip); free(xout_ocip); free(xin_ocip); exit (0); } /*--------------------------------------------------------------------- * connect_db - Connect to the database and set the env to the given * char and nchar character set ids. *---------------------------------------------------------------------*/ static void connect_db(conn_info_t *params_p, oci_t **ociptr, ub2 char_csid, ub2 nchar_csid) { oci_t *ocip; printf ("Connect to Oracle as %.*s@%.*s ", params_p->userlen, params_p->user, params_p->dbnamelen, params_p->dbname); if (char_csid && nchar_csid) printf ("using char csid=%d and nchar csid=%d", char_csid, nchar_csid); printf("\n"); ocip = (oci_t *)malloc(sizeof(oci_t)); if (OCIEnvNlsCreate(&ocip->envp, OCI_OBJECT, (dvoid *)0, (dvoid * (*)(dvoid *, size_t)) 0, (dvoid * (*)(dvoid *, dvoid *, size_t))0, (void (*)(dvoid *, dvoid *)) 0, (size_t) 0, (dvoid **) 0, char_csid, nchar_csid)) { ocierror(ocip, (char *)"OCIEnvCreate() failed"); } if (OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->errp, (ub4) OCI_HTYPE_ERROR, (size_t) 0, (dvoid **) 0)) { ocierror(ocip, (char *)"OCIHandleAlloc(OCI_HTYPE_ERROR) failed"); } /* Logon to database */ OCICALL(ocip, OCILogon(ocip->envp, ocip->errp, &ocip->svcp, params_p->user, params_p->userlen, params_p->passw, params_p->passwlen, params_p->dbname, params_p->dbnamelen)); /* allocate the server handle */ OCICALL(ocip, OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->srvp, OCI_HTYPE_SERVER, (size_t) 0, (dvoid **) 0)); OCICALL(ocip, OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->stmtp, (ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0)); if (*ociptr == (oci_t *)NULL) { *ociptr = ocip; } } /*--------------------------------------------------------------------- * get_db_charsets - Get the database CHAR and NCHAR character set ids. *---------------------------------------------------------------------*/ static const oratext GET_DB_CHARSETS[] = \ "select parameter, value from nls_database_parameters where parameter = \ 'NLS_CHARACTERSET' or parameter = 'NLS_NCHAR_CHARACTERSET'"; #define PARM_BUFLEN (30) static void get_db_charsets(conn_info_t *params_p, ub2 *char_csid, ub2 *nchar_csid) { OCIDefine *defnp1 = (OCIDefine *) NULL; OCIDefine *defnp2 = (OCIDefine *) NULL; oratext parm[PARM_BUFLEN]; oratext value[OCI_NLS_MAXBUFSZ]; ub2 parm_len = 0; ub2 value_len = 0; oci_t ocistruct; oci_t *ocip = &ocistruct; *char_csid = 0; *nchar_csid = 0; memset (ocip, 0, sizeof(ocistruct)); if (OCIEnvCreate(&ocip->envp, OCI_OBJECT, (dvoid *)0, (dvoid * (*)(dvoid *, size_t)) 0, (dvoid * (*)(dvoid *, dvoid *, size_t))0, (void (*)(dvoid *, dvoid *)) 0, (size_t) 0, (dvoid **) 0)) { ocierror(ocip, (char *)"OCIEnvCreate() failed"); } if (OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->errp, (ub4) OCI_HTYPE_ERROR, (size_t) 0, (dvoid **) 0)) { ocierror(ocip, (char *)"OCIHandleAlloc(OCI_HTYPE_ERROR) failed"); } OCICALL(ocip, OCILogon(ocip->envp, ocip->errp, &ocip->svcp, params_p->user, params_p->userlen, params_p->passw, params_p->passwlen, params_p->dbname, params_p->dbnamelen)); OCICALL(ocip, OCIHandleAlloc((dvoid *) ocip->envp, (dvoid **) &ocip->stmtp, (ub4) OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0)); /* Execute stmt to select the db nls char and nchar character set */ OCICALL(ocip, OCIStmtPrepare(ocip->stmtp, ocip->errp, (CONST text *)GET_DB_CHARSETS, (ub4)strlen((char *)GET_DB_CHARSETS), (ub4)OCI_NTV_SYNTAX, (ub4)OCI_DEFAULT)); OCICALL(ocip, OCIDefineByPos(ocip->stmtp, &defnp1, ocip->errp, (ub4) 1, parm, PARM_BUFLEN, SQLT_CHR, (void*) 0, &parm_len, (ub2 *)0, OCI_DEFAULT)); OCICALL(ocip, OCIDefineByPos(ocip->stmtp, &defnp2, ocip->errp, (ub4) 2, value, OCI_NLS_MAXBUFSZ, SQLT_CHR, (void*) 0, &value_len, (ub2 *)0, OCI_DEFAULT)); OCICALL(ocip, OCIStmtExecute(ocip->svcp, ocip->stmtp, ocip->errp, (ub4)0, (ub4)0, (const OCISnapshot *)0, (OCISnapshot *)0, (ub4)OCI_DEFAULT)); while (OCIStmtFetch(ocip->stmtp, ocip->errp, 1, OCI_FETCH_NEXT, OCI_DEFAULT) == OCI_SUCCESS) { value[value_len] = '\0'; if (parm_len == strlen("NLS_CHARACTERSET") && !memcmp(parm, "NLS_CHARACTERSET", parm_len)) { *char_csid = OCINlsCharSetNameToId(ocip->envp, value); printf("Outbound database NLS_CHARACTERSET = %.*s (csid = %d) \n", value_len, value, *char_csid); } else if (parm_len == strlen("NLS_NCHAR_CHARACTERSET") && !memcmp(parm, "NLS_NCHAR_CHARACTERSET", parm_len)) { *nchar_csid = OCINlsCharSetNameToId(ocip->envp, value); printf("Outbound database NLS_NCHAR_CHARACTERSET = %.*s (csid = %d) \n", value_len, value, *nchar_csid); } } disconnect_db(ocip); } /*--------------------------------------------------------------------- * attach - Attach to XStream server specified in connection info *---------------------------------------------------------------------*/ static void attach(oci_t * ocip, conn_info_t *conn, boolean outbound) { sword err; printf ("Attach to XStream %s server '%.*s'\n", outbound ? "outbound" : "inbound", conn->svrnmlen, conn->svrnm); if (outbound) { OCICALL(ocip, OCIXStreamOutAttach(ocip->svcp, ocip->errp, conn->svrnm, (ub2)conn->svrnmlen, (ub1 *)0, 0, OCI_DEFAULT)); } else { OCICALL(ocip, OCIXStreamInAttach(ocip->svcp, ocip->errp, conn->svrnm, (ub2)conn->svrnmlen, (oratext *)"From_XOUT", 9, (ub1 *)0, 0, OCI_DEFAULT)); } ocip->attached = TRUE; ocip->outbound = outbound; } /*--------------------------------------------------------------------- * ping_svr - Ping inbound server by sending a commit LCR. *---------------------------------------------------------------------*/ static void ping_svr(oci_t *xin_ocip, void *commit_lcr, ub1 *cmtpos, ub2 cmtpos_len, oratext *source_db, ub2 source_db_len) { OCIDate src_time; oratext txid[128]; OCICALL(xin_ocip, OCIDateSysDate(xin_ocip->errp, &src_time)); sprintf((char *)txid, "Ping %2d:%2d:%2d", src_time.OCIDateTime.OCITimeHH, src_time.OCIDateTime.OCITimeMI, src_time.OCIDateTime.OCITimeSS); /* Initialize LCR with new txid and commit position */ OCICALL(xin_ocip, OCILCRHeaderSet(xin_ocip->svcp, xin_ocip->errp, source_db, source_db_len, (oratext *)OCI_LCR_ROW_CMD_COMMIT, (ub2)strlen(OCI_LCR_ROW_CMD_COMMIT), (oratext *)0, 0, /* null owner */ (oratext *)0, 0, /* null object */ (ub1 *)0, 0, /* null tag */ txid, (ub2)strlen((char *)txid), &src_time, cmtpos, cmtpos_len, 0, commit_lcr, OCI_DEFAULT)); /* Send commit lcr to inbound server. */ if (OCIXStreamInLCRSend(xin_ocip->svcp, xin_ocip->errp, commit_lcr, OCI_LCR_XROW, 0, OCI_DEFAULT) == OCI_ERROR) { ocierror(xin_ocip, (char *)"OCIXStreamInLCRSend failed in ping_svr()"); } } /*--------------------------------------------------------------------- * get_lcrs - Get LCRs from outbound server and send to inbound server. *---------------------------------------------------------------------*/ static void get_lcrs(oci_t *xin_ocip, oci_t *xout_ocip) { sword status = OCI_SUCCESS; void *lcr; ub1 lcrtype; oraub8 flag; ub1 proclwm[OCI_LCR_MAX_POSITION_LEN]; ub2 proclwm_len = 0; ub1 sv_pingpos[OCI_LCR_MAX_POSITION_LEN]; ub2 sv_pingpos_len = 0; ub1 fetchlwm[OCI_LCR_MAX_POSITION_LEN]; ub2 fetchlwm_len = 0; void *commit_lcr = (void *)0; oratext *lcr_srcdb = (oratext *)0; ub2 lcr_srcdb_len = 0; oratext source_db[M_DBNAME_LEN]; ub2 source_db_len = 0; ub4 lcrcnt = 0; /* create an lcr to ping the inbound server periodically by sending a * commit lcr. */ commit_lcr = (void*)0; OCICALL(xin_ocip, OCILCRNew(xin_ocip->svcp, xin_ocip->errp, OCI_DURATION_SESSION, OCI_LCR_XROW, &commit_lcr, OCI_DEFAULT)); while (status == OCI_SUCCESS) { lcrcnt = 0; /* reset lcr count before each batch */ while ((status = OCIXStreamOutLCRReceive(xout_ocip->svcp, xout_ocip->errp, &lcr, &lcrtype, &flag, fetchlwm, &fetchlwm_len, OCI_DEFAULT)) == OCI_STILL_EXECUTING) { lcrcnt++; /* print header of LCR just received */ print_lcr(xout_ocip, lcr, lcrtype, &lcr_srcdb, &lcr_srcdb_len); /* save the source db to construct ping lcr later */ if (!source_db_len && lcr_srcdb_len) { memcpy(source_db, lcr_srcdb, lcr_srcdb_len); source_db_len = lcr_srcdb_len; } /* send the LCR just received */ if (OCIXStreamInLCRSend(xin_ocip->svcp, xin_ocip->errp, lcr, lcrtype, flag, OCI_DEFAULT) == OCI_ERROR) { ocierror(xin_ocip, (char *)"OCIXStreamInLCRSend failed"); } /* If LCR has chunked columns (i.e, has LOB/Long/XMLType columns) */ if (flag & OCI_XSTREAM_MORE_ROW_DATA) { /* receive and send chunked columns */ get_chunks(xin_ocip, xout_ocip); } } if (status == OCI_ERROR) ocierror(xout_ocip, (char *)"OCIXStreamOutLCRReceive failed"); /* clear the saved ping position if we just received some new lcrs */ if (lcrcnt) { sv_pingpos_len = 0; } /* If no lcrs received during previous WHILE loop and got a new fetch * LWM then send a commit lcr to ping the inbound server with the new * fetch LWM position. */ else if (fetchlwm_len > 0 && source_db_len > 0 && (fetchlwm_len != sv_pingpos_len || memcmp(sv_pingpos, fetchlwm, fetchlwm_len))) { /* To ensure we don't send multiple lcrs with duplicate position, send * a new ping only if we have saved the last ping position. */ if (sv_pingpos_len > 0) { ping_svr(xin_ocip, commit_lcr, fetchlwm, fetchlwm_len, source_db, source_db_len); } /* save the position just sent to inbound server */ memcpy(sv_pingpos, fetchlwm, fetchlwm_len); sv_pingpos_len = fetchlwm_len; } /* flush inbound network to flush all lcrs to inbound server */ OCICALL(xin_ocip, OCIXStreamInFlush(xin_ocip->svcp, xin_ocip->errp, OCI_DEFAULT)); /* get processed LWM of inbound server */ OCICALL(xin_ocip, OCIXStreamInProcessedLWMGet(xin_ocip->svcp, xin_ocip->errp, proclwm, &proclwm_len, OCI_DEFAULT)); if (proclwm_len > 0) { /* Set processed LWM for outbound server */ OCICALL(xout_ocip, OCIXStreamOutProcessedLWMSet(xout_ocip->svcp, xout_ocip->errp, proclwm, proclwm_len, OCI_DEFAULT)); } } if (status != OCI_SUCCESS) ocierror(xout_ocip, (char *)"get_lcrs() encounters error"); } /*--------------------------------------------------------------------- * get_chunks - Get each chunk for the current LCR and send it to * the inbound server. *---------------------------------------------------------------------*/ static void get_chunks(oci_t *xin_ocip, oci_t *xout_ocip) { oratext *colname; ub2 colname_len; ub2 coldty; oraub8 col_flags; ub2 col_csid; ub4 chunk_len; ub1 *chunk_ptr; oraub8 row_flag; sword err; sb4 rtncode; do { /* Get a chunk from outbound server */ OCICALL(xout_ocip, OCIXStreamOutChunkReceive(xout_ocip->svcp, xout_ocip->errp, &colname, &colname_len, &coldty, &col_flags, &col_csid, &chunk_len, &chunk_ptr, &row_flag, OCI_DEFAULT)); /* print chunked column info */ printf( " Chunked column name=%.*s DTY=%d chunk len=%d csid=%d col_flag=0x%lx\n", colname_len, colname, coldty, chunk_len, col_csid, col_flags); /* print chunk data */ print_chunk(chunk_ptr, chunk_len, coldty); /* Send the chunk just received to inbound server */ OCICALL(xin_ocip, OCIXStreamInChunkSend(xin_ocip->svcp, xin_ocip->errp, colname, colname_len, coldty, col_flags, col_csid, chunk_len, chunk_ptr, row_flag, OCI_DEFAULT)); } while (row_flag & OCI_XSTREAM_MORE_ROW_DATA); } /*--------------------------------------------------------------------- * print_chunk - Print chunked column information. Only print the first * 50 bytes for each chunk. *---------------------------------------------------------------------*/ static void print_chunk (ub1 *chunk_ptr, ub4 chunk_len, ub2 dty) { #define MAX_PRINT_BYTES (50) /* print max of 50 bytes per chunk */ ub4 print_bytes; if (chunk_len == 0) return; print_bytes = chunk_len > MAX_PRINT_BYTES ? MAX_PRINT_BYTES : chunk_len; printf(" Data = "); if (dty == SQLT_CHR) printf("%.*s", print_bytes, chunk_ptr); else { ub2 idx; for (idx = 0; idx < print_bytes; idx++) printf("%02x", chunk_ptr[idx]); } printf("\n"); } /*--------------------------------------------------------------------- * print_lcr - Print header information of given lcr. *---------------------------------------------------------------------*/ static void print_lcr(oci_t *ocip, void *lcrp, ub1 lcrtype, oratext **src_db_name, ub2 *src_db_namel) { oratext *cmd_type; ub2 cmd_type_len; oratext *owner; ub2 ownerl; oratext *oname; ub2 onamel; oratext *txid; ub2 txidl; sword ret; printf("\n ----------- %s LCR Header -----------------\n", lcrtype == OCI_LCR_XDDL ? "DDL" : "ROW"); /* Get LCR Header information */ ret = OCILCRHeaderGet(ocip->svcp, ocip->errp, src_db_name, src_db_namel, /* source db */ &cmd_type, &cmd_type_len, /* command type */ &owner, &ownerl, /* owner name */ &oname, &onamel, /* object name */ (ub1 **)0, (ub2 *)0, /* lcr tag */ &txid, &txidl, (OCIDate *)0, /* txn id & src time */ (ub2 *)0, (ub2 *)0, /* OLD/NEW col cnts */ (ub1 **)0, (ub2 *)0, /* LCR position */ (oraub8*)0, lcrp, OCI_DEFAULT); if (ret != OCI_SUCCESS) ocierror(ocip, (char *)"OCILCRHeaderGet failed"); else { printf(" src_db_name=%.*s\n cmd_type=%.*s txid=%.*s\n", *src_db_namel, *src_db_name, cmd_type_len, cmd_type, txidl, txid ); if (ownerl > 0) printf(" owner=%.*s oname=%.*s \n", ownerl, owner, onamel, oname); } } /*--------------------------------------------------------------------- * detach - Detach from XStream server *---------------------------------------------------------------------*/ static void detach(oci_t * ocip) { sword err = OCI_SUCCESS; printf ("Detach from XStream %s server\n", ocip->outbound ? "outbound" : "inbound" ); if (ocip->outbound) { OCICALL(ocip, OCIXStreamOutDetach(ocip->svcp, ocip->errp, OCI_DEFAULT)); } else { OCICALL(ocip, OCIXStreamInDetach(ocip->svcp, ocip->errp, (ub1 *)0, (ub2 *)0, /* processed LWM */ OCI_DEFAULT)); } } /*--------------------------------------------------------------------- * disconnect_db - Logoff from the database *---------------------------------------------------------------------*/ static void disconnect_db(oci_t * ocip) { if (OCILogoff(ocip->svcp, ocip->errp)) { ocierror(ocip, (char *)"OCILogoff() failed"); } if (ocip->errp) OCIHandleFree((dvoid *) ocip->errp, (ub4) OCI_HTYPE_ERROR); if (ocip->envp) OCIHandleFree((dvoid *) ocip->envp, (ub4) OCI_HTYPE_ENV); } /*--------------------------------------------------------------------- * ocierror - Print error status and exit program *---------------------------------------------------------------------*/ static void ocierror(oci_t * ocip, char * msg) { sb4 errcode=0; text bufp[4096]; if (ocip->errp) { OCIErrorGet((dvoid *) ocip->errp, (ub4) 1, (text *) NULL, &errcode, bufp, (ub4) 4096, (ub4) OCI_HTYPE_ERROR); printf("%s\n%s", msg, bufp); } else puts(msg); printf ("\n"); exit(1); } /*-------------------------------------------------------------------- * print_usage - Print command usage *---------------------------------------------------------------------*/ static void print_usage(int exitcode) { puts("\nUsage: xio -ob_svr <outbound_svr> -ob_db <outbound_db>\n" " -ob_usr <conn_user> -ob_pwd <conn_user_pwd>\n" " -ib_svr <inbound_svr> -ib_db <inbound_db>\n" " -ib_usr <apply_user> -ib_pwd <apply_user_pwd>\n"); puts(" ob_svr : outbound server name\n" " ob_db : database name of outbound server\n" " ob_usr : connect user to outbound server\n" " ob_pwd : password of outbound's connect user\n" " ib_svr : inbound server name\n" " ib_db : database name of inbound server\n" " ib_usr : apply user for inbound server\n" " ib_pwd : password of inbound's apply user\n"); exit(exitcode); } /*-------------------------------------------------------------------- * get_inputs - Get user inputs from command line *---------------------------------------------------------------------*/ static void get_inputs(conn_info_t *xout_params, conn_info_t *xin_params, int argc, char ** argv) { char * option; char * value; memset (xout_params, 0, sizeof(*xout_params)); memset (xin_params, 0, sizeof(*xin_params)); while(--argc) { /* get the option name */ argv++; option = *argv; /* check that the option begins with a "-" */ if (!strncmp(option, (char *)"-", 1)) { option ++; } else { printf("Error: bad argument '%s'\n", option); print_usage(1); } /* get the value of the option */ --argc; argv++; value = *argv; if (!strncmp(option, (char *)"ob_db", 5)) { xout_params->dbname = (oratext *)value; xout_params->dbnamelen = (ub4)strlen(value); } else if (!strncmp(option, (char *)"ob_usr", 6)) { xout_params->user = (oratext *)value; xout_params->userlen = (ub4)strlen(value); } else if (!strncmp(option, (char *)"ob_pwd", 6)) { xout_params->passw = (oratext *)value; xout_params->passwlen = (ub4)strlen(value); } else if (!strncmp(option, (char *)"ob_svr", 6)) { xout_params->svrnm = (oratext *)value; xout_params->svrnmlen = (ub4)strlen(value); } else if (!strncmp(option, (char *)"ib_db", 5)) { xin_params->dbname = (oratext *)value; xin_params->dbnamelen = (ub4)strlen(value); } else if (!strncmp(option, (char *)"ib_usr", 6)) { xin_params->user = (oratext *)value; xin_params->userlen = (ub4)strlen(value); } else if (!strncmp(option, (char *)"ib_pwd", 6)) { xin_params->passw = (oratext *)value; xin_params->passwlen = (ub4)strlen(value); } else if (!strncmp(option, (char *)"ib_svr", 6)) { xin_params->svrnm = (oratext *)value; xin_params->svrnmlen = (ub4)strlen(value); } else { printf("Error: unknown option '%s'.\n", option); print_usage(1); } } /* print usage and exit if any argument is not specified */ if (!xout_params->svrnmlen || !xout_params->passwlen || !xout_params->userlen || !xout_params->dbnamelen || !xin_params->svrnmlen || !xin_params->passwlen || !xin_params->userlen || !xin_params->dbnamelen) { printf("Error: missing command arguments. \n"); print_usage(1); } }
A.3 Java APIのサンプルXStreamクライアント・アプリケーション
Java APIのサンプルXStreamクライアント・アプリケーションを実行するには、アプリケーション・ファイルをコンパイルおよびリンクします。
次に、コマンドラインに次のコマンドを入力します。
java xio xsin_oraclesid xsin_host xsin_port xsin_username xsin_passwd xin_servername xsout_oraclesid xsout_host xsout_port xsout_username xsout_passwd xsout_servername
次のプレースホルダに適切な値を代入します。
-
xsin_oraclesidはインバウンド・サーバーのデータベースのOracle SIDです。
-
xsin_hostはインバウンド・サーバーを実行するコンピュータ・システムのホスト名です。
-
xsin_portはインバウンド・サーバーのデータベースのリスナーのポート番号です。
-
xsin_usernameはインバウンド・サーバーの適用ユーザーです。
-
xsin_passwdはインバウンド・サーバーの適用ユーザーのパスワードです。
-
xin_servernameはインバウンド・サーバーの名前です。
-
xsout_oraclesidはアウトバウンド・サーバーのデータベースのOracle SIDです。
-
xsout_hostはアウトバウンド・サーバーを実行するコンピュータ・システムのホスト名です。
-
xsout_portはアウトバウンド・サーバーのデータベースのリスナーのポート番号です。
-
xsout_usernameはアウトバウンド・サーバーの接続ユーザーです。
-
xsout_passwdはアウトバウンド・サーバーの接続ユーザーのパスワードです。
-
xsout_servernameはアウトバウンド・サーバーの名前です。
サンプル・クライアント・アプリケーションの実行中、インバウンド・サーバーおよびアウトバウンド・サーバーへの接続の詳細情報が、各サーバーの最終位置とともに出力されます。出力は、次のようになります。
xsin_host = server2.example.com xsin_port = 1482 xsin_ora_sid = db2 xsin connection url: jdbc:oracle:oci:@server2.example.com:1482:db2 xsout_host = server1.example.com xsout_port = 1481 xsout_ora_sid = db1 xsout connection url: jdbc:oracle:oci:@server1.example.com:1481:db1 Attached to inbound server:xin Inbound Server Last Position is: 0000000920250000000100000001000000092025000000010000000101 Attached to outbound server:xout Last Position is: 0000000920250000000100000001000000092025000000010000000101
このデモは、次の場所にあります。
$ORACLE_HOME/rdbms/demo/xstream/java
デモ・ファイルの名前は、xio.java
です。アプリケーションのコンパイルと実行の詳細は、demoディレクトリにあるREADME.txt
ファイルを参照してください。
Java APIを使用するサンプル・アプリケーションのコードは、次のとおりです。
import oracle.streams.*; import oracle.jdbc.internal.OracleConnection; import oracle.jdbc.*; import oracle.sql.*; import java.sql.*; import java.util.*; public class xio { public static String xsinusername = null; public static String xsinpasswd = null; public static String xsinName = null; public static String xsoutusername = null; public static String xsoutpasswd = null; public static String xsoutName = null; public static String in_url = null; public static String out_url = null; public static Connection in_conn = null; public static Connection out_conn = null; public static XStreamIn xsIn = null; public static XStreamOut xsOut = null; public static byte[] lastPosition = null; public static byte[] processedLowPosition = null; public static void main(String args[]) { // get connection url to inbound and outbound server in_url = parseXSInArguments(args); out_url = parseXSOutArguments(args); // create connection to inbound and outbound server in_conn = createConnection(in_url, xsinusername, xsinpasswd); out_conn = createConnection(out_url, xsoutusername, xsoutpasswd); // attach to inbound and outbound server xsIn = attachInbound(in_conn); xsOut = attachOutbound(out_conn); // main loop to get lcrs get_lcrs(xsIn, xsOut); // detach from inbound and outbound server detachInbound(xsIn); detachOutbound(xsOut); } // parse the arguments to get the conncetion url to inbound db public static String parseXSInArguments(String args[]) { String trace, pref; String orasid, host, port; if (args.length != 12) { printUsage(); System.exit(0); } orasid = args[0]; host = args[1]; port = args[2]; xsinusername = args[3]; xsinpasswd = args[4]; xsinName = args[5]; System.out.println("xsin_host = "+host); System.out.println("xsin_port = "+port); System.out.println("xsin_ora_sid = "+orasid); String in_url = "jdbc:oracle:oci:@"+host+":"+port+":"+orasid; System.out.println("xsin connection url: "+ in_url); return in_url; } // parse the arguments to get the conncetion url to outbound db public static String parseXSOutArguments(String args[]) { String trace, pref; String orasid, host, port; if (args.length != 12) { printUsage(); System.exit(0); } orasid = args[6]; host = args[7]; port = args[8]; xsoutusername = args[9]; xsoutpasswd = args[10]; xsoutName = args[11]; System.out.println("xsout_host = "+host); System.out.println("xsout_port = "+port); System.out.println("xsout_ora_sid = "+orasid); String out_url = "jdbc:oracle:oci:@"+host+":"+port+":"+orasid; System.out.println("xsout connection url: "+ out_url); return out_url; } // print out sample program usage message public static void printUsage() { System.out.println(""); System.out.println("Usage: java xio "+"<xsin_oraclesid> " + "<xsin_host> " + "<xsin_port> "); System.out.println(" "+"<xsin_username> " + "<xsin_passwd> " + "<xsin_servername> "); System.out.println(" "+"<xsout_oraclesid> " + "<xsout_host> " + "<xsout_port> "); System.out.println(" "+"<xsout_username> " + "<xsout_passwd> " + "<xsout_servername> "); } // create a connection to an Oracle Database public static Connection createConnection(String url, String username, String passwd) { try { DriverManager.registerDriver(new oracle.jdbc.OracleDriver()); return DriverManager.getConnection(url, username, passwd); } catch(Exception e) { System.out.println("fail to establish DB connection to: " +url); e.printStackTrace(); return null; } } // attach to the XStream Inbound Server public static XStreamIn attachInbound(Connection in_conn) { XStreamIn xsIn = null; try { xsIn = XStreamIn.attach((OracleConnection)in_conn, xsinName, "XSDEMOINCLIENT" , XStreamIn.DEFAULT_MODE); // use last position to decide where should we start sending LCRs lastPosition = xsIn.getLastPosition(); System.out.println("Attached to inbound server:"+xsinName); System.out.print("Inbound Server Last Position is: "); if (null == lastPosition) { System.out.println("null"); } else { printHex(lastPosition); } return xsIn; } catch(Exception e) { System.out.println("cannot attach to inbound server: "+xsinName); System.out.println(e.getMessage()); e.printStackTrace(); return null; } } // attach to the XStream Outbound Server public static XStreamOut attachOutbound(Connection out_conn) { XStreamOut xsOut = null; try { // when attach to an outbound server, client needs to tell outbound // server the last position. xsOut = XStreamOut.attach((OracleConnection)out_conn, xsoutName, lastPosition, XStreamOut.DEFAULT_MODE); System.out.println("Attached to outbound server:"+xsoutName); System.out.print("Last Position is: "); if (lastPosition != null) { printHex(lastPosition); } else { System.out.println("NULL"); } return xsOut; } catch(Exception e) { System.out.println("cannot attach to outbound server: "+xsoutName); System.out.println(e.getMessage()); e.printStackTrace(); return null; } } // detach from the XStream Inbound Server public static void detachInbound(XStreamIn xsIn) { byte[] processedLowPosition = null; try { processedLowPosition = xsIn.detach(XStreamIn.DEFAULT_MODE); System.out.print("Inbound server processed low Position is: "); if (processedLowPosition != null) { printHex(processedLowPosition); } else { System.out.println("NULL"); } } catch(Exception e) { System.out.println("cannot detach from the inbound server: "+xsinName); System.out.println(e.getMessage()); e.printStackTrace(); } } // detach from the XStream Outbound Server public static void detachOutbound(XStreamOut xsOut) { try { xsOut.detach(XStreamOut.DEFAULT_MODE); } catch(Exception e) { System.out.println("cannot detach from the outbound server: "+xsoutName); System.out.println(e.getMessage()); e.printStackTrace(); } } public static void get_lcrs(XStreamIn xsIn, XStreamOut xsOut) { if (null == xsIn) { System.out.println("xstreamIn is null"); System.exit(0); } if (null == xsOut) { System.out.println("xstreamOut is null"); System.exit(0); } try { while(true) { // receive an LCR from outbound server LCR alcr = xsOut.receiveLCR(XStreamOut.DEFAULT_MODE); if (xsOut.getBatchStatus() == XStreamOut.EXECUTING) // batch is active { assert alcr != null; // send the LCR to the inbound server xsIn.sendLCR(alcr, XStreamIn.DEFAULT_MODE); // also get chunk data for this LCR if any if (alcr instanceof RowLCR) { // receive chunk from outbound then send to inbound if (((RowLCR)alcr).hasChunkData()) { ChunkColumnValue chunk = null; do { chunk = xsOut.receiveChunk(XStreamOut.DEFAULT_MODE); xsIn.sendChunk(chunk, XStreamIn.DEFAULT_MODE); } while (!chunk.isEndOfRow()); } } processedLowPosition = alcr.getPosition(); } else // batch is end { assert alcr == null; // flush the network xsIn.flush(XStreamIn.DEFAULT_MODE); // get the processed_low_position from inbound server processedLowPosition = xsIn.getProcessedLowWatermark(); // update the processed_low_position at oubound server if (null != processedLowPosition) xsOut.setProcessedLowWatermark(processedLowPosition, XStreamOut.DEFAULT_MODE); } } } catch(Exception e) { System.out.println("exception when processing LCRs"); System.out.println(e.getMessage()); e.printStackTrace(); } } public static void printHex(byte[] b) { for (int i = 0; i < b.length; ++i) { System.out.print( Integer.toHexString((b[i]&0xFF) | 0x100).substring(1,3)); } System.out.println(""); } }