この章では、トランザクション・フレームワークAPIを使用して、C++クライアント使用時にトランザクション内で必ずキャッシュ操作が実行されるようにする手順について説明します。この手順では、トランザクションAPIの詳細な使用方法は説明しません。トランザクションAPIの使用方法の詳細は、『Oracle Coherence開発者ガイド』のトランザクション・フレームワークAPIの使用方法に関する項を参照してください。
この章は、トランザクションの実行に必要な次の各項で構成されています。
C++クライアントは、トランザクション・フレームワークAPIを利用することによって、トランザクション内でキャッシュ操作を実行します。トランザクションAPIは、C++でネイティブではサポートされていないため、入力プロセッサ内で使用する必要があります。入力プロセッサはJavaでクラスタ上に実装され、入力プロセッサのスタブ・クラスはC++でクライアント上に実装されます。いずれのクラスでも、POFを使用して、JavaとC++の間でシリアライズを行います。
例15-1は、トランザクションAPIを使用してトランザクション内で単純なupdate操作を実行する入力プロセッサを示しています。実行時には、拡張プロキシ・サーバーのクラスパス上にクラスを配置する必要があります。
例15-1 拡張クライアント・トランザクション用の入力プロセッサ
package coherence.tests;
import com.tangosol.coherence.transaction.Connection;
import com.tangosol.coherence.transaction.ConnectionFactory;
import com.tangosol.coherence.transaction.DefaultConnectionFactory;
import com.tangosol.coherence.transaction.OptimisticNamedCache;
import
com.tangosol.coherence.transaction.exception.PredicateFailedException;
import com.tangosol.coherence.transaction.exception.RollbackException;
import
com.tangosol.coherence.transaction.exception.UnableToAcquireLockException;
import com.tangosol.util.Filter;
import com.tangosol.util.InvocableMap;
import com.tangosol.util.extractor.IdentityExtractor;
import com.tangosol.util.filter.EqualsFilter;
import com.tangosol.util.processor.AbstractProcessor;
public class MyTxProcessor extends AbstractProcessor implements PortableObject
{
public Object process(InvocableMap.Entry entry)
{
// obtain a connection and transaction cache
ConnectionFactory connFactory = new DefaultConnectionFactory();
Connection conn = connFactory.createConnection("TransactionalCache");
OptimisticNamedCache cache = conn.getNamedCache("MyTxCache");
conn.setAutoCommit(false);
// get a value for an existing entry
String sValue = (String) cache.get("existingEntry");
// create predicate filter
Filter predicate = new EqualsFilter(IdentityExtractor.INSTANCE, sValue);
try
{
// update the previously obtained value
cache.update("existingEntry", "newValue", predicate);
}
catch (PredicateFailedException e)
{
// value was updated after it was read
conn.rollback();
return false;
}
catch (UnableToAcquireLockException e)
{
// row is being updated by another tranaction
conn.rollback();
return false;
}
try
{
conn.commit();
}
catch (RollbackException e)
{
// transaction was rolled back
return false;
}
return true;
}
public void readExternal(PofReader in)
throws IOException
{
}
public void writeExternal(PofWriter out)
throws IOException
{
}
}
入力プロセッサのスタブ・クラスを使用すると、クライアントはクラスタ上でトランザクション入力プロセッサを利用できます。スタブ・クラスはC++で実装され、POFを使用してシリアライズを行います。POFを使用することにより、C++とJavaの間で入力プロセッサをシリアライズできます。入力プロセッサのスタブ・クラスにはトランザクション・ロジックは必要ありません。これはトランザクション入力プロセッサのスケルトンです。C++でPOFを使用する際の詳細は、第10章「統合オブジェクトの構築(C++)」を参照してください。
例15-2および例15-3は、スタブ・クラスと、例15-1で作成したトランザクション入力プロセッサの関連ヘッダー・ファイルを示しています。この例では、POFの登録はクラス内で実行されます。
例15-2 トランザクション入力プロセッサのC++スタブ・クラス
#include "coherence/tests/MyTxProcessor.hpp"
#include "coherence/io/pof/SystemPofContext.hpp"
COH_OPEN_NAMESPACE2(coherence,tests)
COH_REGISTER_PORTABLE_CLASS(1599, MyTxProcessor);
MyTxProcessor::MyTxProcessor()
{
}
void MyTxProcessor::readExternal(PofReader::Handle hIn)
{
}
void MyTxProcessor::writeExternal(PofWriter::Handle hOut) const
{
}
Object::Holder MyTxProcessor::process(InvocableMap::Entry::Handle hEntry) const
{
return NULL;
}
COH_CLOSE_NAMESPACE2
例15-3 トランザクション入力プロセッサのC++スタブ・クラス・ヘッダー・ファイル
#ifndef COH_TX_EP_HPP
#define COH_TX_EP_HPP
#include "coherence/lang.ns"
#include "coherence/io/pof/PofReader.hpp"
#include "coherence/io/pof/PofWriter.hpp"
#include "coherence/io/pof/PortableObject.hpp"
#include "coherence/util/InvocableMap.hpp"
#include "coherence/util/processor/AbstractProcessor.hpp";
COH_OPEN_NAMESPACE2(coherence,tests)
using coherence::io::pof::PofReader;
using coherence::io::pof::PofWriter;
using coherence::io::pof::PortableObject;
using coherence::util::InvocableMap;
using coherence::util::processor::AbstractProcessor;
class MyTxProcessor
: public class_spec<MyTxProcessor,
extends<AbstractProcessor>,
implements<PortableObject> >
{
friend class factory<MyTxProcessor>;
protected:
MyTxProcessor();
public:
virtual Object::Holder process(InvocableMap::Entry::Handle hEntry) const;
public:
virtual void readExternal(PofReader::Handle hIn);
virtual void writeExternal(PofWriter::Handle hOut) const;
};
COH_CLOSE_NAMESPACE2
#endif // COH_TX_EP_HPP
入力プロセッサのクラスは、POFユーザー・タイプとして、クラスタ側のPOF構成ファイルで登録する必要があります。登録の際は、クライアント側でスタブ・クラスの登録に使用した同じタイプIDを使用する必要があります。次の例は、例15-1で作成したMyTxProcessorクラスの登録を示しており、例15-2で登録された同じタイプIDを使用しています。
<?xml version="1.0"?>
<pof-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.oracle.com/coherence/coherence-pof-config"
xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-pof-config
coherence-pof-config.xsd">
<user-type-list>
<include>coherence-pof-config.xml</include>
<include>txn-pof-config.xml</include>
<user-type>
<type-id>1599</type-id>
<class-name>coherence.tests.MyTxProcessor</class-name>
</user-type>
</user-type-list>
</pof-config>
トランザクションを実行するには、クラスタ側のキャッシュ構成ファイルでトランザクション・キャッシュを定義する必要があります。トランザクション・キャッシュは、トランザクション・フレームワークでトランザクション保証を行うために使用します。トランザクション・キャッシュの詳細は、『Oracle Coherence開発者ガイド』のトランザクション・キャッシュの定義に関する項を参照してください。
次の例では、MyTxCacheという名前のトランザクション・キャッシュを作成します。これは、例15-1で入力プロセッサによって使用されていたキャッシュ名です。この構成には、リモート・クライアントから入力プロセッサを実行する際に必要なプロキシ・スキームと分散キャッシュ・スキームも含まれています。プロキシは、ポート9099のlocalhostでクライアントのTCP/IP接続を受け入れるように構成します。Coherence*Extend使用時にクラスタ側のキャッシュを構成する方法の詳細は、第3章「Coherence*Extendの設定」を参照してください。
<?xml version='1.0'?>
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config
coherence-cache-config.xsd">
<defaults>
<serializer>pof</serializer>
</defaults>
<caching-scheme-mapping>
<cache-mapping>
<cache-name>MyTxCache</cache-name>
<scheme-name>example-transactional</scheme-name>
</cache-mapping>
<cache-mapping>
<cache-name>dist-example</cache-name>
<scheme-name>example-distributed</scheme-name>
</cache-mapping>
</caching-scheme-mapping>
<caching-schemes>
<transactional-scheme>
<scheme-name>example-transactional</scheme-name>
<service-name>TransactionalCache</service-name>
<thread-count>7</thread-count>
<high-units>15M</high-units>
<task-timeout>0</task-timeout>
<autostart>true</autostart>
</transactional-scheme>
<distributed-scheme>
<scheme-name>example-distributed</scheme-name>
<service-name>DistributedCache</service-name>
<backing-map-scheme>
<local-scheme/>
</backing-map-scheme>
<autostart>true</autostart>
</distributed-scheme>
<proxy-scheme>
<service-name>ExtendTcpProxyService</service-name>
<thread-count>5</thread-count>
<acceptor-config>
<tcp-acceptor>
<local-address>
<address>localhost</address>
<port>9099</port>
</local-address>
</tcp-acceptor>
</acceptor-config>
<autostart>true</autostart>
</proxy-scheme>
</caching-schemes>
</cache-config>
クラスタのプロキシに接続してトランザクション入力プロセッサを実行するには、リモート・クライアントにリモート・キャッシュが必要です。リモート・キャッシュは、クライアント側のキャッシュ構成ファイルで定義します。クライアント側のキャッシュを構成する方法の詳細は、第3章「Coherence*Extendの設定」を参照してください。
次の例では、リモート・キャッシュをポート9099のlocalhostに配置されているプロキシに接続するように構成します。また、リモート・キャッシュの名前(dist-example)はトランザクション入力プロセッサの開始時に使用されるクラスタ側のキャッシュの名前と一致している必要があります。
<?xml version='1.0'?>
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config
coherence-cache-config.xsd">
<defaults>
<serializer>pof</serializer>
</defaults>
<caching-scheme-mapping>
<cache-mapping>
<cache-name>dist-example</cache-name>
<scheme-name>extend</scheme-name>
</cache-mapping>
</caching-scheme-mapping>
<caching-schemes>
<remote-cache-scheme>
<scheme-name>extend</scheme-name>
<service-name>ExtendTcpCacheService</service-name>
<initiator-config>
<tcp-initiator>
<remote-addresses>
<socket-address>
<address>localhost</address>
<port>9099</port>
</socket-address>
</remote-addresses>
<connect-timeout>30s</connect-timeout>
</tcp-initiator>
<outgoing-message-handler>
<request-timeout>30s</request-timeout>
</outgoing-message-handler>
</initiator-config>
</remote-cache-scheme>
</caching-schemes>
</cache-config>
クライアントで入力プロセッサのスタブ・クラスを起動する方法は、入力プロセッサを起動する方法と同じです。ただし、実行時にはクラスタ側の入力プロセッサが起動されます。起動がJavaクラスに委任されたことは、クライアント側では認識されません。次の例は、入力プロセッサのスタブ・クラスを使用して、結果的に例15-1で作成したトランザクション入力プロセッサを起動するクライアントを示しています。
String::View vsCacheName = "dist-example"; String::View vsKey = "AnyKey"; // retrieve the named cache NamedCache::Handle hCache = CacheFactory::getCache(vsCacheName); // invoke the cache Object::View oResult = hCache->invoke(vsKey, MyTxProcessor::create()); std::cout << "Result of extend transaction execution: " << oResult << std::endl;