The following sections are included in this chapter and are required to perform transactions:
.NET clients perform cache operations within a transaction by leveraging the Transaction Framework API. The transaction API is not supported natively on .NET and must be used within an entry processor. The entry processor is implemented in Java on the cluster and an entry processor stub class is implemented in C# on the client. Both classes use POF to serialize between Java and C#.
Example 22-1 demonstrates an entry processor that performs a simple update
operation within a transaction using the transaction API. At run time, the class must be located on the classpath of the Coherence proxy server.
Example 22-1 Entry Processor for Extend Client Transaction
package coherence.tests; import com.tangosol.coherence.transaction.Connection; import com.tangosol.coherence.transaction.ConnectionFactory; import com.tangosol.coherence.transaction.DefaultConnectionFactory; import com.tangosol.coherence.transaction.OptimisticNamedCache; import com.tangosol.coherence.transaction.exception.PredicateFailedException; import com.tangosol.coherence.transaction.exception.RollbackException; import com.tangosol.coherence.transaction.exception.UnableToAcquireLockException; import com.tangosol.util.Filter; import com.tangosol.util.InvocableMap; import com.tangosol.util.extractor.IdentityExtractor; import com.tangosol.util.filter.EqualsFilter; import com.tangosol.util.processor.AbstractProcessor; public class MyTxProcessor extends AbstractProcessor implements PortableObject { public Object process(InvocableMap.Entry entry) { // obtain a connection and transaction cache ConnectionFactory connFactory = new DefaultConnectionFactory(); Connection conn = connFactory.createConnection("TransactionalCache"); OptimisticNamedCache cache = conn.getNamedCache("MyTxCache"); conn.setAutoCommit(false); // get a value for an existing entry String sValue = (String) cache.get("existingEntry"); // create predicate filter Filter predicate = new EqualsFilter(IdentityExtractor.INSTANCE, sValue); try { // update the previously obtained value cache.update("existingEntry", "newValue", predicate); } catch (PredicateFailedException e) { // value was updated after it was read conn.rollback(); return false; } catch (UnableToAcquireLockException e) { // row is being updated by another tranaction conn.rollback(); return false; } try { conn.commit(); } catch (RollbackException e) { // transaction was rolled back return false; } return true; } public void readExternal(PofReader in) throws IOException { } public void writeExternal(PofWriter out) throws IOException { } }
An entry processor stub class allows a client to use the transactional entry processor on the cluster. The stub class is implemented in C# and uses POF for serialization. POF allows an entry processor to be serialized between C# and Java. The entry processor stub class does not required any transaction logic and is a skeleton of the transactional entry processor. See Building Integration Objects (.NET), for detailed information on using POF with .NET.
Example 22-2 demonstrate an entry processor stub class for the transactional entry processor created in Example 22-1.
Example 22-2 Transaction Entry Processor .NET Stub Class
using Tangosol.IO.Pof; using Tangosol.Net.Cache; using Tangosol.Util.Processor; namespace Coherence.Tests{ public class MyTxProcessor : AbstractProcessor, IPortableObject { public MyTxProcessor() { } public override object Process(IInvocableCacheEntry entry) { return null; } public void ReadExternal(IPofReader reader) { } public void WriteExternal(IPofWriter writer) { } } }
Custom user types must be registered for the Java transactional entry processor in the cluster-side POF configuration file and for the client stub in the client-side POF configuration file. Both registrations must use the same type ID. The following example demonstrates registering both the MyTxProcessor
class that was created in Example 22-1 and the client stub class that was created in Example 22-2, respectively.
Cluster-side POF configuration:
<?xml version="1.0"?> <pof-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.oracle.com/coherence/coherence-pof-config" xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-pof-config coherence-pof-config.xsd"> <user-type-list> <include>coherence-pof-config.xml</include> <include>txn-pof-config.xml</include> <user-type> <type-id>1599</type-id> <class-name>coherence.tests.MyTxProcessor</class-name> </user-type> </user-type-list> </pof-config>
Client-side POF configuration:
<?xml version="1.0"?> <pof-config xmlns="http://schemas.tangosol.com/pof" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://schemas.tangosol.com/pof assembly://Coherence/Tangosol.Config/pof-config.xsd"> <user-type-list> <include>coherence-pof-config.xml</include> <user-type> <type-id>1599</type-id> <class-name>Coherence.Tests.MyTxProcessor</class-name> </user-type> </user-type-list> </pof-config>
Transactions require a transactional cache to be defined in the cluster-side cache configuration file. Transactional caches are used by the Transaction Framework to provide transactional guarantees. See "Defining Transactional Caches" in Developing Applications with Oracle Coherence for details on transactional caches.
The following example creates a transactional cache that is named MyTxCache
, which is the cache name that was used by the entry processor in Example 22-1. The configuration also includes a proxy scheme and a distributed cache scheme that are required to execute the entry processor from a remote client. The proxy is configured to accept client TCP/IP connections on localhost
at port 7077
. See Configuring Extend Proxies, for detailed information on configuring cluster-side caches when using Coherence*Extend.
<?xml version='1.0'?> <cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config" xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd"> <caching-scheme-mapping> <cache-mapping> <cache-name>MyTxCache</cache-name> <scheme-name>example-transactional</scheme-name> </cache-mapping> <cache-mapping> <cache-name>dist-example</cache-name> <scheme-name>example-distributed</scheme-name> </cache-mapping> </caching-scheme-mapping> <caching-schemes> <transactional-scheme> <scheme-name>example-transactional</scheme-name> <service-name>TransactionalCache</service-name> <thread-count-min>2</thread-count-min> <thread-count-max>10</thread-count-max> <high-units>15M</high-units> <task-timeout>0</task-timeout> <autostart>true</autostart> </transactional-scheme> <distributed-scheme> <scheme-name>example-distributed</scheme-name> <service-name>DistributedCache</service-name> <backing-map-scheme> <local-scheme/> </backing-map-scheme> <autostart>true</autostart> </distributed-scheme> <proxy-scheme> <service-name>ExtendTcpProxyService</service-name> <autostart>true</autostart> </proxy-scheme> </caching-schemes> </cache-config>
Remote clients require a remote cache to connect to the cluster's proxy and run a transactional entry processor. The remote cache is defined in the client-side cache configuration file. See Configuring Extend Proxies, for detailed information on configuring client-side caches.
The following example configures a remote cache to connect to a proxy that is located on localhost
at port 7077
. In addition, the name of the remote cache (dist-example
) must match the name of a cluster-side cache that is used when initiating the transactional entry processor.
<?xml version='1.0'?> <cache-config xmlns="http://schemas.tangosol.com/cache" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://schemas.tangosol.com/cache assembly://Coherence/Tangosol.Config/cache-config.xsd"> <caching-scheme-mapping> <cache-mapping> <cache-name>dist-example</cache-name> <scheme-name>extend</scheme-name> </cache-mapping> </caching-scheme-mapping> <caching-schemes> <remote-cache-scheme> <scheme-name>extend</scheme-name> <service-name>ExtendTcpCacheService</service-name> <initiator-config> <tcp-initiator> <remote-addresses> <socket-address> <address>localhost</address> <port>7077</port> </socket-address> </remote-addresses> </tcp-initiator> <outgoing-message-handler> <request-timeout>30s</request-timeout> </outgoing-message-handler> </initiator-config> </remote-cache-scheme> </caching-schemes> </cache-config>
A client invokes an entry processor stub class the same way any entry processor is invoked. However, at run time, the cluster-side entry processor is invoked on the cluster. The client is unaware that the invocation has been delegated to the Java class. The following example demonstrates a client that uses the entry processor stub class and results in an invocation of the transactional entry processor that was created in Example 22-1:
INamedCache cache = CacheFactory.GetCache("dist-example"); object result = cache.Invoke( "AnyKey", new MyTxProcessor()); Console.Out.WriteLine("Result of extend transaction execution: " + result );