The following sections are included in this chapter and are required to perform transactions:
C++ clients perform cache operations within a transaction by leveraging the Transaction Framework API. The transaction API is not supported natively on C++ and must be used within an entry processor. The entry processor is implemented in Java on the cluster and an entry processor stub class is implemented in C++ on the client. Both classes use POF to serialize between Java and C++.
Example 16-1 demonstrates an entry processor that performs a simple update
operation within a transaction using the transaction API. At run time, the class must be located on the classpath of the extend proxy server.
Example 16-1 Entry Processor for Extend Client Transaction
package coherence.tests; import com.tangosol.coherence.transaction.Connection; import com.tangosol.coherence.transaction.ConnectionFactory; import com.tangosol.coherence.transaction.DefaultConnectionFactory; import com.tangosol.coherence.transaction.OptimisticNamedCache; import com.tangosol.coherence.transaction.exception.PredicateFailedException; import com.tangosol.coherence.transaction.exception.RollbackException; import com.tangosol.coherence.transaction.exception.UnableToAcquireLockException; import com.tangosol.util.Filter; import com.tangosol.util.InvocableMap; import com.tangosol.util.extractor.IdentityExtractor; import com.tangosol.util.filter.EqualsFilter; import com.tangosol.util.processor.AbstractProcessor; public class MyTxProcessor extends AbstractProcessor implements PortableObject { public Object process(InvocableMap.Entry entry) { // obtain a connection and transaction cache ConnectionFactory connFactory = new DefaultConnectionFactory(); Connection conn = connFactory.createConnection("TransactionalCache"); OptimisticNamedCache cache = conn.getNamedCache("MyTxCache"); conn.setAutoCommit(false); // get a value for an existing entry String sValue = (String) cache.get("existingEntry"); // create predicate filter Filter predicate = new EqualsFilter(IdentityExtractor.INSTANCE, sValue); try { // update the previously obtained value cache.update("existingEntry", "newValue", predicate); } catch (PredicateFailedException e) { // value was updated after it was read conn.rollback(); return false; } catch (UnableToAcquireLockException e) { // row is being updated by another tranaction conn.rollback(); return false; } try { conn.commit(); } catch (RollbackException e) { // transaction was rolled back return false; } return true; } public void readExternal(PofReader in) throws IOException { } public void writeExternal(PofWriter out) throws IOException { } }
An entry processor stub class allows a client to use the transactional entry processor on the cluster. The stub class is implemented in C++ and uses POF for serialization. POF allows an entry processor to be serialized between C++ and Java. The entry processor stub class does not require any transaction logic and is a skeleton of the transactional entry processor. See Building Integration Objects (C++), for detailed information on using POF with C++.
Example 16-2 and Example 16-3 demonstrate a stub class and associated header file for the transactional entry processor created in Example 16-1. In the example, POF registration is performed within the class.
Example 16-2 Transaction Entry Processor C++ Stub Class
#include "coherence/tests/MyTxProcessor.hpp" #include "coherence/io/pof/SystemPofContext.hpp" COH_OPEN_NAMESPACE2(coherence,tests) COH_REGISTER_PORTABLE_CLASS(1599, MyTxProcessor); MyTxProcessor::MyTxProcessor() { } void MyTxProcessor::readExternal(PofReader::Handle hIn) { } void MyTxProcessor::writeExternal(PofWriter::Handle hOut) const { } Object::Holder MyTxProcessor::process(InvocableMap::Entry::Handle hEntry) const { return NULL; } COH_CLOSE_NAMESPACE2
Example 16-3 Transaction Entry Processor C++ Stub Class Header File
#ifndef COH_TX_EP_HPP #define COH_TX_EP_HPP #include "coherence/lang.ns" #include "coherence/io/pof/PofReader.hpp" #include "coherence/io/pof/PofWriter.hpp" #include "coherence/io/pof/PortableObject.hpp" #include "coherence/util/InvocableMap.hpp" #include "coherence/util/processor/AbstractProcessor.hpp"; COH_OPEN_NAMESPACE2(coherence,tests) using coherence::io::pof::PofReader; using coherence::io::pof::PofWriter; using coherence::io::pof::PortableObject; using coherence::util::InvocableMap; using coherence::util::processor::AbstractProcessor; class MyTxProcessor : public class_spec<MyTxProcessor, extends<AbstractProcessor>, implements<PortableObject> > { friend class factory<MyTxProcessor>; protected: MyTxProcessor(); public: virtual Object::Holder process(InvocableMap::Entry::Handle hEntry) const; public: virtual void readExternal(PofReader::Handle hIn); virtual void writeExternal(PofWriter::Handle hOut) const; }; COH_CLOSE_NAMESPACE2 #endif // COH_TX_EP_HPP
An entry processor class must be registered as a POF user type in the cluster-side POF configuration file. The registration must use the same type ID that was used to register the stub class on the client side. The following example demonstrates registering the MyTxProcessor
class that was created in Example 16-1 and uses the same type ID that was registered in Example 16-2:
<?xml version="1.0"?> <pof-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.oracle.com/coherence/coherence-pof-config" xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-pof-config coherence-pof-config.xsd"> <user-type-list> <include>coherence-pof-config.xml</include> <include>txn-pof-config.xml</include> <user-type> <type-id>1599</type-id> <class-name>coherence.tests.MyTxProcessor</class-name> </user-type> </user-type-list> </pof-config>
Transactions require a transactional cache to be defined in the cluster-side cache configuration file. Transactional caches are used by the Transaction Framework to provide transactional guarantees. See "Defining Transactional Caches" in Developing Applications with Oracle Coherence for details on transactional caches.
The following example creates a transactional cache that is named MyTxCache
, which is the cache name that was used by the entry processor in Example 16-1. The configuration also includes a proxy scheme and a distributed cache scheme that are required to execute the entry processor from a remote client. The proxy is configured to accept client TCP/IP connections on localhost
at port 7077
. See Configuring Extend Proxies, for detailed information on configuring cluster-side caches when using Coherence*Extend.
<?xml version='1.0'?> <cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config" xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd"> <defaults> <serializer>pof</serializer> </defaults> <caching-scheme-mapping> <cache-mapping> <cache-name>MyTxCache</cache-name> <scheme-name>example-transactional</scheme-name> </cache-mapping> <cache-mapping> <cache-name>dist-example</cache-name> <scheme-name>example-distributed</scheme-name> </cache-mapping> </caching-scheme-mapping> <caching-schemes> <transactional-scheme> <scheme-name>example-transactional</scheme-name> <service-name>TransactionalCache</service-name> <thread-count-min>2</thread-count-min> <thread-count-max>10</thread-count-max> <high-units>15M</high-units> <task-timeout>0</task-timeout> <autostart>true</autostart> </transactional-scheme> <distributed-scheme> <scheme-name>example-distributed</scheme-name> <service-name>DistributedCache</service-name> <backing-map-scheme> <local-scheme/> </backing-map-scheme> <autostart>true</autostart> </distributed-scheme> <proxy-scheme> <service-name>ExtendTcpProxyService</service-name> <autostart>true</autostart> </proxy-scheme> </caching-schemes> </cache-config>
Remote clients require a remote cache to connect to the cluster's proxy and run a transactional entry processor. The remote cache is defined in the client-side cache configuration file. See Configuring Extend Proxies, for detailed information on configuring client-side caches.
The following example configures a remote cache to connect to a proxy that is located on localhost
at port 7077
. In addition, the name of the remote cache (dist-example
) must match the name of a cluster-side cache that is used when initiating the transactional entry processor.
<?xml version='1.0'?> <cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config" xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd"> <defaults> <serializer>pof</serializer> </defaults> <caching-scheme-mapping> <cache-mapping> <cache-name>dist-example</cache-name> <scheme-name>extend</scheme-name> </cache-mapping> </caching-scheme-mapping> <caching-schemes> <remote-cache-scheme> <scheme-name>extend</scheme-name> <service-name>ExtendTcpCacheService</service-name> <initiator-config> <tcp-initiator> <remote-addresses> <socket-address> <address>localhost</address> <port>7077</port> </socket-address> </remote-addresses> </tcp-initiator> <outgoing-message-handler> <request-timeout>30s</request-timeout> </outgoing-message-handler> </initiator-config> </remote-cache-scheme> </caching-schemes> </cache-config>
A client invokes an entry processor stub class the same way any entry processor is invoked. However, at run time, the cluster-side entry processor is invoked. The client is unaware that the invocation has been delegated to the Java class. The following example demonstrates a client that uses the entry processor stub class and results in an invocation of the transactional entry processor that was created in Example 16-1:
String::View vsCacheName = "dist-example"; String::View vsKey = "AnyKey"; // retrieve the named cache NamedCache::Handle hCache = CacheFactory::getCache(vsCacheName); // invoke the cache Object::View oResult = hCache->invoke(vsKey, MyTxProcessor::create()); std::cout << "Result of extend transaction execution: " << oResult << std::endl;