The instructions do not provide detailed transaction API usage. See Using the Transaction Framework API in Developing Applications with Oracle Coherence.
The following sections are included in this chapter and are required to perform transactions:
Parent topic: Creating C++ Extend Clients
Example 16-1 demonstrates an entry processor that performs a simple update
operation within a transaction using the transaction API. At run time, the class must be located on the classpath of the extend proxy server.
Example 16-1 Entry Processor for Extend Client Transaction
package coherence.tests; import com.tangosol.coherence.transaction.Connection; import com.tangosol.coherence.transaction.ConnectionFactory; import com.tangosol.coherence.transaction.DefaultConnectionFactory; import com.tangosol.coherence.transaction.OptimisticNamedCache; import com.tangosol.coherence.transaction.exception.PredicateFailedException; import com.tangosol.coherence.transaction.exception.RollbackException; import com.tangosol.coherence.transaction.exception.UnableToAcquireLockException; import com.tangosol.util.Filter; import com.tangosol.util.InvocableMap; import com.tangosol.util.extractor.IdentityExtractor; import com.tangosol.util.filter.EqualsFilter; import com.tangosol.util.processor.AbstractProcessor; public class MyTxProcessor extends AbstractProcessor implements PortableObject { public Object process(InvocableMap.Entry entry) { // obtain a connection and transaction cache ConnectionFactory connFactory = new DefaultConnectionFactory(); Connection conn = connFactory.createConnection("TransactionalCache"); OptimisticNamedCache cache = conn.getNamedCache("MyTxCache"); conn.setAutoCommit(false); // get a value for an existing entry String sValue = (String) cache.get("existingEntry"); // create predicate filter Filter predicate = new EqualsFilter(IdentityExtractor.INSTANCE, sValue); try { // update the previously obtained value cache.update("existingEntry", "newValue", predicate); } catch (PredicateFailedException e) { // value was updated after it was read conn.rollback(); return false; } catch (UnableToAcquireLockException e) { // row is being updated by another tranaction conn.rollback(); return false; } try { conn.commit(); } catch (RollbackException e) { // transaction was rolled back return false; } return true; } public void readExternal(PofReader in) throws IOException { } public void writeExternal(PofWriter out) throws IOException { } }
Example 16-2 and Example 16-3 demonstrate a stub class and associated header file for the transactional entry processor created in Example 16-1. In the example, POF registration is performed within the class.
Example 16-2 Transaction Entry Processor C++ Stub Class
#include "coherence/tests/MyTxProcessor.hpp" #include "coherence/io/pof/SystemPofContext.hpp" COH_OPEN_NAMESPACE2(coherence,tests) COH_REGISTER_PORTABLE_CLASS(1599, MyTxProcessor); MyTxProcessor::MyTxProcessor() { } void MyTxProcessor::readExternal(PofReader::Handle hIn) { } void MyTxProcessor::writeExternal(PofWriter::Handle hOut) const { } Object::Holder MyTxProcessor::process(InvocableMap::Entry::Handle hEntry) const { return NULL; } COH_CLOSE_NAMESPACE2
Example 16-3 Transaction Entry Processor C++ Stub Class Header File
#ifndef COH_TX_EP_HPP #define COH_TX_EP_HPP #include "coherence/lang.ns" #include "coherence/io/pof/PofReader.hpp" #include "coherence/io/pof/PofWriter.hpp" #include "coherence/io/pof/PortableObject.hpp" #include "coherence/util/InvocableMap.hpp" #include "coherence/util/processor/AbstractProcessor.hpp"; COH_OPEN_NAMESPACE2(coherence,tests) using coherence::io::pof::PofReader; using coherence::io::pof::PofWriter; using coherence::io::pof::PortableObject; using coherence::util::InvocableMap; using coherence::util::processor::AbstractProcessor; class MyTxProcessor : public class_spec<MyTxProcessor, extends<AbstractProcessor>, implements<PortableObject> > { friend class factory<MyTxProcessor>; protected: MyTxProcessor(); public: virtual Object::Holder process(InvocableMap::Entry::Handle hEntry) const; public: virtual void readExternal(PofReader::Handle hIn); virtual void writeExternal(PofWriter::Handle hOut) const; }; COH_CLOSE_NAMESPACE2 #endif // COH_TX_EP_HPP
The following example demonstrates registering the MyTxProcessor
class that was created in Example 16-1 and uses the same type ID that was registered in Example 16-2
<?xml version="1.0"?> <pof-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.oracle.com/coherence/coherence-pof-config" xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-pof-config coherence-pof-config.xsd"> <user-type-list> <include>coherence-pof-config.xml</include> <include>txn-pof-config.xml</include> <user-type> <type-id>1599</type-id> <class-name>coherence.tests.MyTxProcessor</class-name> </user-type> </user-type-list> </pof-config>
The following example creates a transactional cache that is named MyTxCache
, which is the cache name that was used by the entry processor in Example 16-1. The configuration also includes a proxy scheme and a distributed cache scheme that are required to execute the entry processor from a remote client. The proxy is configured to accept client TCP/IP connections on localhost
at port 7077
. See Configuring Extend Proxies.
<?xml version='1.0'?> <cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config" xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd"> <defaults> <serializer>pof</serializer> </defaults> <caching-scheme-mapping> <cache-mapping> <cache-name>MyTxCache</cache-name> <scheme-name>example-transactional</scheme-name> </cache-mapping> <cache-mapping> <cache-name>dist-example</cache-name> <scheme-name>example-distributed</scheme-name> </cache-mapping> </caching-scheme-mapping> <caching-schemes> <transactional-scheme> <scheme-name>example-transactional</scheme-name> <service-name>TransactionalCache</service-name> <thread-count-min>2</thread-count-min> <thread-count-max>10</thread-count-max> <high-units>15M</high-units> <task-timeout>0</task-timeout> <autostart>true</autostart> </transactional-scheme> <distributed-scheme> <scheme-name>example-distributed</scheme-name> <service-name>DistributedCache</service-name> <backing-map-scheme> <local-scheme/> </backing-map-scheme> <autostart>true</autostart> </distributed-scheme> <proxy-scheme> <service-name>ExtendTcpProxyService</service-name> <autostart>true</autostart> </proxy-scheme> </caching-schemes> </cache-config>
The following example configures a remote cache to connect to a proxy that is located on localhost
at port 7077
. In addition, the name of the remote cache (dist-example
) must match the name of a cluster-side cache that is used when initiating the transactional entry processor.
<?xml version='1.0'?> <cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config" xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd"> <defaults> <serializer>pof</serializer> </defaults> <caching-scheme-mapping> <cache-mapping> <cache-name>dist-example</cache-name> <scheme-name>extend</scheme-name> </cache-mapping> </caching-scheme-mapping> <caching-schemes> <remote-cache-scheme> <scheme-name>extend</scheme-name> <service-name>ExtendTcpCacheService</service-name> <initiator-config> <tcp-initiator> <remote-addresses> <socket-address> <address>localhost</address> <port>7077</port> </socket-address> </remote-addresses> </tcp-initiator> <outgoing-message-handler> <request-timeout>30s</request-timeout> </outgoing-message-handler> </initiator-config> </remote-cache-scheme> </caching-schemes> </cache-config>
The following example demonstrates a client that uses the entry processor stub class and results in an invocation of the transactional entry processor that was created in Example 16-1:
String::View vsCacheName = "dist-example"; String::View vsKey = "AnyKey"; // retrieve the named cache NamedCache::Handle hCache = CacheFactory::getCache(vsCacheName); // invoke the cache Object::View oResult = hCache->invoke(vsKey, MyTxProcessor::create()); std::cout << "Result of extend transaction execution: " << oResult << std::endl;