25 Performing Basic Topic Publish and Subscribe Operations
- Overview of Topics API
Thecom.tangosol.net.NamedTopic<V>
interface is the initial interface used by applications to publish and subscribe to the topic. It has factory methods to create aPublisher
and aSubscriber
. It also has methods to manage subscriber groups. - Getting a Topic Instance
To get a reference to aNamedTopic
instance, applications can use theSession
API or theCacheFactory
API. - Using NamedTopic Type Checking
Coherence includes the ability to request strongly-typedNamedTopic
instances when using either theSession
orCacheFactory
API by using explicit types. - Publishing to a Topic
An activePublisher
, one that has not been closed, asynchronously sends a value to a topic using the send method and returns aCompletableFuture
. - Subscribing Directly to a Topic
An active subscriber receives all values delivered to a topic. - Releasing Subscriber Resources
A subscriber can be closed either by explicitly invoking theSubscriber.close()
operation or by creating the subscriber with atry-with-resources
statement. - Subscribing to a Subscriber Group
A topic value either is retained until it is received by a subscriber group member or expires. By default, there is no expiry for topic values. - Subscriber Groups for a NamedTopic
Subscriber group can be created in two ways. - Destroying a Topic’s Subscriber Group
A subscriber group life span is independent of its subscriber group members. - Managing the Resources Used by a Topic
Both Publisher and Subscriber have resources associated with them and the resources must be closed when no longer needed. - Understanding Topics Flow Control
Given that topicsPublisher.send
andSubscriber.receive
methods provide an asynchronous (non-blocking) way of submitting data exchange requests, there is a default flow control mechanism to efficiently manage these requests. - Managing the Publisher Flow Control to Place Upper Bound on Topics Storage
In a data processing pipeline where the published values to the topic are significantly outpacing the subscribers’ abilities to process those values, one can either allow for an unbounded storage for the unconsumed values on the topic or place a maximum storage size constraint for the retained values on the topic.
Parent topic: Performing Data Grid Operations
Overview of Topics API
com.tangosol.net.NamedTopic<V>
interface
is the initial interface used by applications to publish and subscribe to the topic. It
has factory methods to create a Publisher
and a
Subscriber
. It also has methods to manage subscriber groups.
The com.tangosol.net.Pubisher<V>
interface has an asynchronous
send
method that publishes a value to a topic. The method returns
CompletableFuture<Void>
to enable tracking when the send
completes or if an exception was thrown.
The com.tangosol.net.Subscriber<V>
interface has a
receive method that returns a CompleteFuture<Element<V>>
. A
Subscriber either subscribes to a subscriber group
(using
Subscriber.Name.of
(subscriberGroupName
) or
directly subscribes to a topic.
Parent topic: Performing Basic Topic Publish and Subscribe Operations
Publisher Creation Options
The following are the Publisher creation options to be used with
NamedTopic.createPublisher(Publisher.Option[]
.
Publisher Option | Description |
---|---|
OnFailure.Stop |
Default . If an individual
Publisher.send(V) invocation fails then stop any
further publishing and close the Publisher .
|
OnFailure.Continue |
If an individual Publisher.send(V) invocation fails
then skip that value and continue to publish other values.
|
FailOnFull.enabled() |
When the storage size of the unprocessed values on the
topic exceed a configured high-units , the
CompletableFuture returned from the
Publisher.send(V) invocation should complete
exceptionally. Overrides the default to block completing until the
operation completes when space becomes available on the topic.
|
OrderBy.thread() |
Default . Ensure that all values
sent from the same thread are stored sequentially.
|
OrderBy.none() |
Enforce no specific ordering between sent
values allowing for the greatest level of
parallelism.
|
OrderBy.id(int) |
Ensure ordering of sent values across
all threads which share the same id.
|
OrderBy.value(ToIntFunction) |
Compute the unit-of-order based on the applying this
method on sent value .
|
Parent topic: Overview of Topics API
Subscriber Creation Options
The following are the Subscriber creation options to be used with
NamedTopic.createSubscriber(Subscriber.Option[]
.
Subscriber Option | Description |
---|---|
Name.of(String) |
Specify subscriber group name to join as a member. For each value delivered to a subscriber group, only one member of the group receives the value. |
Filtered.by(Filter) |
Only Subscriber.receive() values
matching this Filter . Only one Filter
per subscription group.
|
Convert.using(Function) |
Convert topic value using provided
Function prior to
Subscriber.receive() . Only one
Converter per subscription group.
|
CompleteOnEmpty.enabled() |
When no values left to
Subscriber.receive() , returned
CompletableFuture.get() returns
null Element. By default, returned
CompletableFuture.get() blocks until next topic
value is available to return.
|
The com.tangosol.net.Subscriber<V>
interface has
a receive method that returns a
CompleteFuture<Element<V>>
. A Subscriber either
subscribes to a subscriber group
(using
Subscriber.Name.of
(subscriberGroupName
) or
directly subscribes to a topic.
Parent topic: Overview of Topics API
Getting a Topic Instance
NamedTopic
instance, applications can use the Session
API or the
CacheFactory
API.The Session
API is
the preferred approach for getting a topic because it offers a concise set of methods that
allows for the use and injection of Coherence sessions that are non-static. In contrast, the
CacheFactory
API exposes many static methods that require knowledge of
internal Coherence concepts and services. Lastly, the Session
API allows
for a more efficient lifecycle and integration with other frameworks, especially frameworks
that use injection.
The following examples demonstrates creating a session using a default session
provider and then get a reference to a NamedTopic
instance using the
Session.getTopic
method. The name of the topic is included as a
parameter.
import com.tangosol.net.*;
...
Session session = Session.create();
NamedTopic<String> Topic = session.getTopic(“topic”,ValueTypeAssertion.withType(String.class));
Or
ConfigurableCacheFactory ccf = CacheFactory.getConfigurableCacheFactory();
NamedTopic<String> topic =
ccf.ensureTopic(“topic”, ValueTypeAssertion.withType(String.class));
To bypass generics type checking, omit optional <value-type>
definition in topic-mapping configuration and omit ValueTypeAssertion
option as a parameter for Session.getTopic
or
ConfigurableCacheFactory.ensureTopic
methods.
Parent topic: Performing Basic Topic Publish and Subscribe Operations
Using NamedTopic Type Checking
NamedTopic
instances when using either the Session
or CacheFactory
API by using explicit types.By default, NamedTopic<Object>
instances return. This is the most flexible mechanism to create and use NamedTopic
instances; however, it is the responsibility of the application to ensure the expected value types when interacting with topic instances.
The following example stores objects of any type for use by an application:
import com.tangosol.net.*;
...
Session session = Session.create();
NamedTopic<Object> topic = session.getTopic(“MyTopic”);
or,
ConfigurableCacheFactory ccf = CacheFactory.getConfigurableCacheFactory();
NamedTopic<Object> topic = ccf.ensureTopic(“MyTopic”);
The getTopic
method can be used to request a NamedTopic
instance of a specific type, including if necessary, without type checking. The ValueTypeAssertion
interface is used with the getTopic
method to assert the correctness of the type of values used with a NamedTopic
instance. The method can be used to assert that a topic should use raw types. For example:
NamedTopic<Object> topic = session.getTopic("MyTopic", ValueTypeAssertion.withRawTypes());
likewise when using CacheFactory
,
NamedTopic<String> topic =
ccf.ensureTopic(“MyTopic”, ValueTypeAssertion.withRawTypes());
For stronger type safety, you can create a topic and explicitly assert the value types to be used by the topic. For example, to create a topic and assert that values must be of type String
, an application can use:
NamedTopic<String> topic = session.getTopic("MyTopic", ValueTypeAssertion.withTypes(String.class);
A NamedTopic
instance is not required to adhere to the asserted type
and may choose to disregard it; however, a warning message is emitted at compile-time. For
example:
NamedTopic topic = session.getTopic("MyTopic", ValueTypeAssertion.withTypes(String.class);
Likewise, an application may choose to assert that raw types be used and a name cache instance may use specific types. However, in both cases, this could lead to errors if types are left unchecked. For example:
NamedTopic<String> topic = session.getTopic("MyTopic", ValueTypeAssertion.withRawTypes());
For the strongest type safety, specific types can also be declared as part of a topic
definition in the cache configuration file. A runtime error occurs if an application attempts
to use types that are different than those configured as part of the topic definition. The
following examples configures a topic that only support values that are of type
String
.
<topic-mapping> <topic-name>MyTopic</topic-name> <scheme-name>distributed-topic</scheme-name> <value-type>String</value-type> </topic-mapping>
Lastly, an application can choose to disable explicit type checking. If type checking is disabled, then the application is responsible for ensuring type safety.
NamedTopic<Object> topic = session.getTopic("MyTopic", ValueTypeAssertion.withoutTypeChecking());
Parent topic: Performing Basic Topic Publish and Subscribe Operations
Publishing to a Topic
Publisher
, one that has not been closed,
asynchronously sends a value to a topic using the send method and returns a
CompletableFuture
. For example, an
asynchronous call is turned into a synchronous call by calling join on the
CompletableFuture
.
See CompletableFuture
for the numerous options
available for working with the completion of many outstanding asynchronous
operations.
NamedTopic<String> topic = ….; Publisher<String> publisher = topic.createPublisher(); CompletableFuture future = publisher.send(“someValue”); // Completes when asynchronous send completes. // Throws an exception if send operation completed exceptionally. future.join();
- Requirements for Topic Values
- Awaiting Completion of Outstanding Publisher Sends
- Releasing Publisher Resources
Parent topic: Performing Basic Topic Publish and Subscribe Operations
Requirements for Topic Values
Topic values must be serializable (for example,
java.io.Serializable
or Coherence Portable Object
Format serialization). See Using Portable Object Format.
Parent topic: Publishing to a Topic
Awaiting Completion of Outstanding Publisher Sends
If a publisher sends a number of values to a topic without waiting
for each send to complete, a Publisher.flush().join()
invocation synchronously waits for all pending
Publisher.send()
to complete. Note that even when
one or more of the outstanding operations complete exceptionally, this
invocation always completes successfully. Use the
CompletableFuture
methods for detecting completing
exceptionally when it is necessary to detect when a value is not published
successfully to the topic.
Parent topic: Publishing to a Topic
Releasing Publisher Resources
One releases publisher resources by explicitly invoking
Publisher.close()
or by creating the publisher with
a try-with-resources
statement as shown in Example 25-1.
Publisher.close()
is a blocking operation and does
not return until the close is complete. Once the Publisher is closed, it is
no longer active (see Publisher.isActive()
), all future
send
operations on that Publisher
fail with an IllegalStateException
stating the Publisher is
no longer active. However, all outstanding
CompletableFutures
for outstanding
send
operations occurring prior to the
close
call are allowed to complete. Just as
Publisher.flush.join()
, the close may complete
successfully or exceptionally, and no exceptions are thrown from the close
for any outstanding CompletableFutures
that complete
exceptionally.
Parent topic: Publishing to a Topic
Subscribing Directly to a Topic
NamedTopic<String> topic = ….; Subscriber<String> subscriber = topic.createSubscriber(); CompletableFuture<Element<String>> future = receive(); String received = future.get().getValue();
Parent topic: Performing Basic Topic Publish and Subscribe Operations
Releasing Subscriber Resources
Subscriber.close()
operation or by
creating the subscriber with a try-with-resources
statement.For details, see Example 25-2.
Subscriber close is a blocking operation and does not return until
the operation is completed. After close
is called on a
Subscriber
, all future receive
calls on the inactive Subscriber
fail with an
IllegalStateException
. Most of the outstanding
CompletableFuture
(s) from receive
calls will complete exceptionally, while some of them may complete.
Parent topic: Performing Basic Topic Publish and Subscribe Operations
Subscribing to a Subscriber Group
A topic value either is retained until it is received by a subscriber group member or expires. By default, there is no expiry for topic values.
See configuration <expiry-delay>
subelement
of <paged-topic-scheme>.
NamedTopic<String> topic = ….; Subscriber<String> subscriber = topic.createSubscriber(Subscriber.Name.of(“subscriberGroup”)); CompletableFuture<Element<String>> future = receive(); String value = future.get().getValue();
Parent topic: Performing Basic Topic Publish and Subscribe Operations
Subscriber Groups for a NamedTopic
Subscriber group can be created in two ways.
- Statically configure durable subscriber group(s) on
a
<topic-mapping>
element in thecoherence-cache-config
file. See Example 20-3. - Dynamically create a subscriber group when a
Subscriber is created with the
Subscriber.Name.of(aSubscriberGroupName)
option to join a subscriber group, and thesubscriber group
does not exist yet on the topic.
NamedTopic.getSubscriberGroups
returns both
configured and dynamically created subscriber groups.
NamedTopic<String> topic = ….; Set<String> subscriberGroups = topic.getSubscriberGroups();
Parent topic: Performing Basic Topic Publish and Subscribe Operations
Destroying a Topic’s Subscriber Group
All existing subscriber group members will be impacted by this operation and all outstanding asynchronous receive operations for the subscriber group members are cancelled.
NamedTopic<String> topic = ….; topic.destroySubscriberGroup(“subscriberGroup”);
Parent topic: Performing Basic Topic Publish and Subscribe Operations
Managing the Resources Used by a Topic
Both Publisher and Subscriber have resources associated with them and the resources must be closed when no longer needed.
Both Publisher
and Subscriber
can be used with
try-with-resource
pattern. This usage pattern ensures
close
is called for both of these topic resources. See Publisher auto
closed resource details Example 25-1. See Subscriber auto closed resource details Example 25-2.
Assume subscriber group durableSubscriber
is statically
configured for topic aTopic
as shown in Example 20-3.
Example 25-1 try-with-resource for Publisher
Session session = Session.create(); NamedTopic topic = session.createTopic(“aTopic”, ValueTypeAssertion.withType(String.class)); try (Publisher<String> publisher = topic.createPublisher(…)) { int MAX = …; for (int i = 0; I < MAX; i++) { publisher.send(“value” + i); } // wait for all outstanding asynchronous operations to complete before publisher is implicitly closed by exiting try-with-resource block. publisher.flush().join(); }
Example 25-2 Subscriber processing with try-with-resource
//Process outstanding values on a subscriber group try (Subscriber<String> subscriber = topic.createSubscriber(Subscriber.Name.of(“durableSubscriber”)), Subscriber.CompleteOnEmpy.enabled())) { while (true) { // asynchronous receive from direct subscriber to topic. CompletableFuture<Element<String>> result= subscriber.receive(); Element<String> topicElement = result.get(); if (topicElement == null) { // no more elements available break; } else { String value = topicElement.getValue(); // process value received from subscriber. ... } // auto close subscriber and its resources. Outstanding CompletableFutures for // receive will mostly complete exceptionally, though some may complete. // a subscriber group accumulates future delivered values even if there // are no active subscriber group members. }
NamedTopic.destroy()
releases all the topics resources and destroys
the instance. All outstanding Publishers
and
Subscribers
are closed. Outstanding operations on
Publishers
and Subscribers
are completed
exceptionally. Subscriber groups
are destroyed and all topic storage is
released, including persistent storage.
Parent topic: Performing Basic Topic Publish and Subscribe Operations
Understanding Topics Flow Control
Given that topics Publisher.send
and
Subscriber.receive
methods provide an asynchronous
(non-blocking) way of submitting data exchange requests, there is a default flow
control mechanism to efficiently manage these requests.
When the automated flow control detects too many outstanding send
and/or receive operations, running out of storage on a topic for outstanding
values to be consumed, the outstanding CompletableFuture
(s)
for the async send
and/or receive
may be
throttled to allow the system to catch up with all the outstanding requests.
Both Publisher
and Subscriber
class
provide a way for the application to get a FlowControl
instance that allows it to opt-out of automatic flow control and manually
govern the rate of the request flow. See the
com.tangosol.net.FlowControl
Javadoc for more
details.
Parent topic: Performing Basic Topic Publish and Subscribe Operations
Managing the Publisher Flow Control to Place Upper Bound on Topics Storage
In a data processing pipeline where the published values to the topic are significantly outpacing the subscribers’ abilities to process those values, one can either allow for an unbounded storage for the unconsumed values on the topic or place a maximum storage size constraint for the retained values on the topic.
By default, there are no storage constraints on the storage size for the values being retained by a topic. If a storage limit is configured for the topic as illustrated Example 25-3, there are several options available to restrict the topic’s storage usage.
Once a storage limit is configured for a topic, the default behavior is for the publisher(s) to a topic to be throttled, blocked, when the next send would exceed the topic’s storage limit. The intention is that by blocking the publisher(s), it gives the subscriber(s) time to catch up on processing, while limiting the amount of storage used by the topic. Once the subscriber(s) catch up on processing and those values on the topic have been consumed by all subscriber(s) and subscriber group(s), then the publisher(s) send(s) would be allowed to resume.
There are Publisher creation time options to change the default
behavior. The Publisher.FailOnFull.enabled()
option
indicates that a publisher send
should not block, but
rather complete exceptionally. The default Publisher
option
Publisher.OnFailure.Stop
results in the Publisher
auto closing when this exception occurs. This default preserves the order of
published values from the publisher since the publisher is closed on first
exceptional case that the value could not be delivered.
Example 25-3 FailOnFull publishing
Session session = Session.create(); NamedTopic topic = session.createTopic(“aTopic”, ValueTypeAssertion.withType(String.class)); try (Publisher<String> publisher = topic.createPublisher(Publisher.FailOnFull.enabled())) { int MAX = …; int cSent = 0; for (int i = 0; I < MAX; i++) { try { // synchronous send that throws exception if publisher is full. publisher.send(“value” + i).join(); cSent++; } catch (Exception e) { // when full throws CompletionException with a cause of IllegalStateException: the topic is at capacity. break; } } if (cSent != MAX) { // all values were not published to topic. } }
If there is no need to preserve this order, the Publisher
can be created with option
Publisher.OnFailure.Continue
and only the
sends that occur when the topic is full, will fail.
To override the default blocking on sending to a full topic, one can use
the following pattern to exempt a thread from flow control pause.
This technique disables flow control and eliminates the storage
limit on the topic for this NonBlocking
thread.
Example 25-4 Exempt thread from flow control pause when reaching topic storage limit
import com.tangosol.net.*; import com.oracle.coherence.common.base.NonBlocking; Session session = Session.create(): NamedTopic topic = Session.createTopic(“aTopic”, ValueTypeAssertion.withType(String.class)); try (NonBlocking nb = NonBlocking(); Publisher<String> publisher = topic.createPublisher(…)) { int MAX = …; for (int i = 0; I < MAX; i++) { publisher.send(“value” + i); } }
Parent topic: Performing Basic Topic Publish and Subscribe Operations