26 Performing Basic Topic Publish and Subscribe Operations

You can use the Coherence APIs to perform basic topic publish and subscribe operations.This chapter includes the following sections:

Overview of Topics API

The com.tangosol.net.NamedTopic<V> interface is the initial interface used by applications to publish and subscribe to the topic. It has factory methods to create a Publisher and a Subscriber. It also has methods to manage subscriber groups.

The com.tangosol.net.Pubisher<V> interface has an asynchronous publish method that publishes a value to a topic. The method returns CompletableFuture<Publisher.Status> to enable tracking when the send completes or if an exception is thrown. The returned Publisher.Status contains details of the published message

The com.tangosol.net.Subscriber<V> interface has a receive method that returns a CompleteFuture<Element<V>>. A a Subscriber either subscribes to a subscriber group (using the Subscriber.inGroup (subscriberGroupName) option or subscribes directly to a topic as an anonymous subscriber.

Publisher Creation Options

The following table lists the Publisher creation options to be used with NamedTopic.createPublisher(Publisher.Option... option).

Publisher Option Description
OnFailure.Stop Default. If an individual Publisher.send(V) invocation fails, stop any further publishing and close the Publisher.
OnFailure.Continue If an individual Publisher.send(V) invocation fails, skip that value and continue to publish other values.
FailOnFull.enabled() When the storage size of the unprocessed values on the topic exceeds a configured high-units value, the CompletableFuture returned from the Publisher.send(V) invocation should complete exceptionally. Overrides the default to block completing until the operation completes after space becomes available on the topic.
OrderBy.thread() Default. Ensures that all values sent from the same thread are stored sequentially.
OrderBy.none() Enforces no specific ordering between sent values allowing for the greatest level of parallelism.
OrderBy.id(int) Ensures ordering of sent values across all threads which share the same ID.
OrderBy.value(ToIntFunction) Computes the unit-of-order based on the application of this method on sent value.
OrderBy.roundRobin() Publishes messages to each channel in the topic sequentially, looping back around to channel zero, after “channel count” messages have been published.

Subscriber Creation Options

The following table lists the Subscriber creation options to be used with NamedTopic.createSubscriber(Subscriber.Option... option).

Subscriber Option Description
Name.of(String) Specifies a subscriber group name to join as a member. For each value delivered to a subscriber group, only one member of the group receives the value.
Filtered.by(Filter) Only Subscriber.receive() values matching this Filter. Only one Filter per subscription group.
Convert.using(Function) Converts topic value using provided Function prior to Subscriber.receive(). Only one Converter per subscription group.
CompleteOnEmpty.enabled() When no values left to Subscriber.receive(), returned CompletableFuture.get() returns null element. By default, returned CompletableFuture.get() blocks until next topic value is available to return.
ChannelOwnershipListeners.of(ChannelOwnershipListener) Adds a ChannelOwnershipListener to receive events when the channels assigned to the subscriber change.

The com.tangosol.net.Subscriber<V> interface has a receive method that returns a CompleteFuture<Element<V>>. A Subscriber either subscribes to a subscriber group (using Subscriber.Name.of (subscriberGroupName) or directly subscribes to a topic.

Getting a Topic Instance

To get a reference to a NamedTopic instance, applications can use the Session API or the CacheFactory API.The Session API is the preferred approach for getting a topic because it offers a concise set of methods that allows for the use and injection of Coherence sessions that are non-static. In contrast, the CacheFactory API exposes many static methods that require knowledge of internal Coherence concepts and services. Lastly, the Session API allows for a more efficient lifecycle and integration with other frameworks, especially frameworks that use injection.

The following examples demonstrates creating a session using a default session provider and then get a reference to a NamedTopic instance using the Session.getTopic method. The name of the topic is included as a parameter.

import com.tangosol.net.*;
...
Session session = Session.create();
NamedTopic<String> Topic = session.getTopic(“topic”,ValueTypeAssertion.withType(String.class));

Or


ConfigurableCacheFactory ccf   = CacheFactory.getConfigurableCacheFactory();
NamedTopic<String> topic       = 
                 ccf.ensureTopic(“topic”, ValueTypeAssertion.withType(String.class));

To bypass generics type checking, omit optional <value-type> definition in topic-mapping configuration and omit ValueTypeAssertion option as a parameter for Session.getTopic or ConfigurableCacheFactory.ensureTopic methods.

Using NamedTopic Type Checking

Coherence includes the ability to request strongly-typed NamedTopic instances when using either the Session or CacheFactory API by using explicit types.By default, NamedTopic<Object> instances return. This is the most flexible mechanism to create and use NamedTopic instances; however, it is the responsibility of the application to ensure the expected value types when interacting with topic instances.

The following example stores objects of any type for use by an application:

import com.tangosol.net.*;
...
Session session = Session.create();
NamedTopic<Object> topic = session.getTopic(“MyTopic”);

or,

ConfigurableCacheFactory ccf   = CacheFactory.getConfigurableCacheFactory();
NamedTopic<Object> topic       = ccf.ensureTopic(“MyTopic”);

The getTopic method can be used to request a NamedTopic instance of a specific type, including if necessary, without type checking. The ValueTypeAssertion interface is used with the getTopic method to assert the correctness of the type of values used with a NamedTopic instance. The method can be used to assert that a topic should use raw types. For example:

NamedTopic<Object> topic = session.getTopic("MyTopic",
 ValueTypeAssertion.withRawTypes());

likewise when using CacheFactory,


NamedTopic<String> topic       = 
                 ccf.ensureTopic(“MyTopic”, ValueTypeAssertion.withRawTypes());

For stronger type safety, you can create a topic and explicitly assert the value types to be used by the topic. For example, to create a topic and assert that values must be of type String, an application can use:

NamedTopic<String> topic = session.getTopic("MyTopic",
 ValueTypeAssertion.withTypes(String.class);

A NamedTopic instance is not required to adhere to the asserted type and may choose to disregard it; however, a warning message is emitted at compile-time. For example:

NamedTopic topic = session.getTopic("MyTopic",
 ValueTypeAssertion.withTypes(String.class);

Likewise, an application may choose to assert that raw types be used and a name cache instance may use specific types. However, in both cases, this could lead to errors if types are left unchecked. For example:

NamedTopic<String> topic = session.getTopic("MyTopic",
 ValueTypeAssertion.withRawTypes());

For the strongest type safety, specific types can also be declared as part of a topic definition in the cache configuration file. A runtime error occurs if an application attempts to use types that are different than those configured as part of the topic definition. The following examples configures a topic that only support values that are of type String.

<topic-mapping>
   <topic-name>MyTopic</topic-name>
   <scheme-name>distributed-topic</scheme-name>
   <value-type>String</value-type>
</topic-mapping>

Lastly, an application can choose to disable explicit type checking. If type checking is disabled, then the application is responsible for ensuring type safety.

NamedTopic<Object> topic = session.getTopic("MyTopic",
   ValueTypeAssertion.withoutTypeChecking());

Publishing to a Topic

An active Publisher, one that has not been closed, asynchronously sends a value to a topic using the send method and returns a CompletableFuture. For example, an asynchronous call is turned into a synchronous call by calling join on the CompletableFuture.

See CompletableFuture for the numerous options available for working with the completion of many outstanding asynchronous operations.

NamedTopic<String> topic    = ….;
Publisher<String> publisher = topic.createPublisher();
CompletableFuture future    = publisher.send(“someValue”);

// Completes when asynchronous send completes.
// Throws an exception if send operation completed exceptionally. 
future.join();

Requirements for Topic Values

Topic values must be serializable (for example, java.io.Serializable or Coherence Portable Object Format serialization). See Using Portable Object Format.

Awaiting Completion of Outstanding Publisher Sends

If a publisher sends a number of values to a topic without waiting for each send to complete, a Publisher.flush().join() invocation synchronously waits for all pending Publisher.send() to complete. Note that even when one or more of the outstanding operations complete exceptionally, this invocation always completes successfully. Use the CompletableFuture methods for detecting completing exceptionally when it is necessary to detect when a value is not published successfully to the topic.

Releasing Publisher Resources

One releases publisher resources by explicitly invoking Publisher.close() or by creating the publisher with a try-with-resources statement as shown in Example 26-1. Publisher.close() is a blocking operation and does not return until the close is complete. Once the Publisher is closed, it is no longer active (see Publisher.isActive()), all future send operations on that Publisher fail with an IllegalStateException stating the Publisher is no longer active. However, all outstanding CompletableFutures for outstanding send operations occurring prior to the close call are allowed to complete. Just as Publisher.flush.join(), the close may complete successfully or exceptionally, and no exceptions are thrown from the close for any outstanding CompletableFutures that complete exceptionally.

Subscribing Directly to a Topic

An active subscriber receives all values delivered to a topic. A subscriber is active from its creation until it is closed.
NamedTopic<String> topic      = ….;
Subscriber<String> subscriber = topic.createSubscriber();

CompletableFuture<Element<String>> future = receive();
String received  = future.get().getValue();

Releasing Subscriber Resources

A subscriber can be closed either by explicitly invoking the Subscriber.close() operation or by creating the subscriber with a try-with-resources statement.For details, see Example 26-2.

Subscriber close is a blocking operation and does not return until the operation is completed. After close is called on a Subscriber, all future receive calls on the inactive Subscriber fail with an IllegalStateException. Most of the outstanding CompletableFuture(s) from receive calls will complete exceptionally, while some of them may complete.

Subscribing to a Subscriber Group

A topic value either is retained until it is received by a subscriber group member or expires. By default, there is no expiry for topic values.

See configuration <expiry-delay> subelement of <paged-topic-scheme>.

NamedTopic<String> topic      = ….;
Subscriber<String> subscriber =    
         topic.createSubscriber(Subscriber.Name.of(“subscriberGroup”));
CompletableFuture<Element<String>> future = receive();
String value  = future.get().getValue();

Subscriber Groups for a NamedTopic

Subscriber group can be created in two ways.

There are:
  • Statically configure durable subscriber group(s) on a <topic-mapping> element in the coherence-cache-config file. See Example 21-3.
  • Dynamically create a subscriber group when a Subscriber is created with the Subscriber.Name.of(aSubscriberGroupName) option to join a subscriber group, and the subscriber group does not exist yet on the topic.
The method NamedTopic.getSubscriberGroups returns both configured and dynamically created subscriber groups.
NamedTopic<String> topic            = ….;
Set<String>        subscriberGroups = topic.getSubscriberGroups();

Destroying a Topic’s Subscriber Group

A subscriber group life span is independent of its subscriber group members. The code fragment below stops accumulating values for a subscriber group and releases all unconsumed values.

All existing subscriber group members will be impacted by this operation and all outstanding asynchronous receive operations for the subscriber group members are cancelled.

NamedTopic<String> topic            = ….;
topic.destroySubscriberGroup(“subscriberGroup”);

Committing Messages

When using subscriber groups, messages are durable and will be delivered with an “at least once” guarantee. This means that a message will be redelivered if the subscriber disconnects for any reason or the subscriber’s channels are re-allocated to another subscriber. To indicate a message has been processed and should not be redelivered, the subscriber must commit the message. A subscriber does not have to commit every message received. Committing a message automatically commits any messages received by the subscriber prior to the committed message. Messages can be committed synchronously or asynchronously.

To synchronously commit a message, use the commit() method on the Element provided by the CompletableFuture that is returned by the subscriber’s receive() method.

For example, the code below creates a subscriber for the topic named “test-topic” in the subscriber group named “test”. A message is received, processed, and then committed. The returned Subscriber.CommitResult can be used to determine information about the commit operation, such as success or failure.
Subscriber<String> subscriber = session.createSubscriber("test-topic", Subscriber.inGroup("test"));
CompletableFuture<Subscriber.Element<String>> future = subscriber.receive();
Subscriber.Element<String> element = future.get();
String message = element.getValue();
// process message...
Subscriber.CommitResult result = element.commit();

To commit a message asynchronously use the element’s commitAsync() method.

For example, the code below calls the element’s commitAsync() method. The CompletableFuture returned will be completed when the commit operation is completed.
CompletableFuture<Subscriber.CommitResult> future = element.commitAsync();

Messages can also be committed using just a Channel and a Position, using the Subscriber’s commit methods.

For example, in the code shown below a Subscriber receives an element, and the message’s Channel and Position obtained from the element. After processing, the message is committed using the Channel and Position.
Subscriber<String> subscriber = session.createSubscriber("test-topic", Subscriber.inGroup("test"));
CompletableFuture<Subscriber.Element<String>> future = subscriber.receive();
Subscriber.Element<String> element = future.get();

int channel = element.getChannel();
Position position = element.getPosition();

String message = element.getValue();

// process message...

Subscriber.CommitResult result = subscriber.commit(channel, position);

Managing the Resources Used by a Topic

Both Publisher and Subscriber have resources associated with them and the resources must be closed when no longer needed.

Both Publisher and Subscriber can be used with try-with-resource pattern. This usage pattern ensures close is called for both of these topic resources. See Publisher auto closed resource details Example 26-1. See Subscriber auto closed resource details Example 26-2.

Assume subscriber group durableSubscriber is statically configured for topic aTopic as shown in Example 21-3.

Example 26-1 try-with-resource for Publisher

Session    session = Session.create();
NamedTopic topic   = session.createTopic(“aTopic”,     
                                        ValueTypeAssertion.withType(String.class));
try (Publisher<String> publisher = topic.createPublisher(…)) {
   int MAX = …;
   for (int i = 0; I < MAX; i++) {
       publisher.send(“value” + i);
   }

   // wait for all outstanding asynchronous operations to complete before publisher is implicitly closed by exiting try-with-resource block. 
   publisher.flush().join(); 
}

Example 26-2 Subscriber processing with try-with-resource

//Process outstanding values on a subscriber group
try (Subscriber<String> subscriber =        
         topic.createSubscriber(Subscriber.Name.of(“durableSubscriber”)),
                                Subscriber.CompleteOnEmpy.enabled())) {
    while (true) {
        // asynchronous receive from direct subscriber to topic.
        CompletableFuture<Element<String>> result= subscriber.receive();
        Element<String>                    topicElement = result.get();
        if (topicElement == null) {
            // no more elements available
            break;
        } else {
 	     String value = topicElement.getValue();
            // process value received from subscriber.
            ...
        }
// auto close subscriber and its resources. Outstanding CompletableFutures for 
// receive will mostly complete exceptionally, though some may complete.
// a subscriber group accumulates future delivered values even if there
// are no active subscriber group members.
}

NamedTopic.destroy() releases all the topics resources and destroys the instance. All outstanding Publishers and Subscribers are closed. Outstanding operations on Publishers and Subscribers are completed exceptionally. Subscriber groups are destroyed and all topic storage is released, including persistent storage.

Understanding Topics Flow Control

Given that topics Publisher.send and Subscriber.receive methods provide an asynchronous (non-blocking) way of submitting data exchange requests, there is a default flow control mechanism to efficiently manage these requests.

When the automated flow control detects too many outstanding send and/or receive operations, running out of storage on a topic for outstanding values to be consumed, the outstanding CompletableFuture(s) for the async send and/or receive may be throttled to allow the system to catch up with all the outstanding requests. Both Publisher and Subscriber class provide a way for the application to get a FlowControl instance that allows it to opt-out of automatic flow control and manually govern the rate of the request flow. See the com.tangosol.net.FlowControl Javadoc for more details.

Managing the Publisher Flow Control to Place Upper Bound on Topics Storage

In a data processing pipeline where the published values to the topic are significantly outpacing the subscribers’ abilities to process those values, one can either allow for an unbounded storage for the unconsumed values on the topic or place a maximum storage size constraint for the retained values on the topic.

By default, there are no storage constraints on the storage size for the values being retained by a topic. If a storage limit is configured for the topic as illustrated Example 26-3, there are several options available to restrict the topic’s storage usage.

Once a storage limit is configured for a topic, the default behavior is for the publisher(s) to a topic to be throttled, blocked, when the next send would exceed the topic’s storage limit. The intention is that by blocking the publisher(s), it gives the subscriber(s) time to catch up on processing, while limiting the amount of storage used by the topic. Once the subscriber(s) catch up on processing and those values on the topic have been consumed by all subscriber(s) and subscriber group(s), then the publisher(s) send(s) would be allowed to resume.

There are Publisher creation time options to change the default behavior. The Publisher.FailOnFull.enabled() option indicates that a publisher send should not block, but rather complete exceptionally. The default Publisher option Publisher.OnFailure.Stop results in the Publisher auto closing when this exception occurs. This default preserves the order of published values from the publisher since the publisher is closed on first exceptional case that the value could not be delivered.

Example 26-3 FailOnFull publishing

Session    session = Session.create();
NamedTopic topic   = session.createTopic(“aTopic”,     
                                        ValueTypeAssertion.withType(String.class));

try (Publisher<String> publisher = topic.createPublisher(Publisher.FailOnFull.enabled())) {
   int MAX   = …;
   int cSent = 0;
   for (int i = 0; I < MAX; i++) {
       try {
       	// synchronous send that throws exception if publisher is full. 
       	publisher.send(“value” + i).join();
        cSent++;
      } catch (Exception e) {
        // when full throws CompletionException with a cause of IllegalStateException: the topic is at capacity.
        break;
      }
   }
   if (cSent != MAX) {
      // all values were not published to topic.
   }
}

If there is no need to preserve this order, the Publisher can be created with option Publisher.OnFailure.Continue and only the sends that occur when the topic is full, will fail.

To override the default blocking on sending to a full topic, one can use the following pattern to exempt a thread from flow control pause. This technique disables flow control and eliminates the storage limit on the topic for this NonBlocking thread.

Example 26-4 Exempt thread from flow control pause when reaching topic storage limit

import com.tangosol.net.*;
import com.oracle.coherence.common.base.NonBlocking;

Session    session = Session.create():
NamedTopic topic   = Session.createTopic(“aTopic”,     
                                       ValueTypeAssertion.withType(String.class));
try (NonBlocking       nb        = NonBlocking();
     Publisher<String> publisher = topic.createPublisher(…)) {
   int MAX = …;
   for (int i = 0; I < MAX; i++) {
       publisher.send(“value” + i);
   }
}