40 Using the Repository API
Coherence Repository API makes the implementation of data access layer within the applications easier, regardless of which framework you use to implement applications that use Coherence as a data store. It works equally well for plain Java applications and applications that use CDI, where you can simply create your own repository implementations.
It is also the foundation for Micronaut Data (see Micronaut Data with Coherence) and Spring Data repository implementations. All the functionalities described in this chapter are available when using these frameworks as well. The only difference is how you define your own repositories, which is framework-specific and documented separately.
The Repository API is implemented on top of the NamedMap
API and
provides a number of features that make it easier to use for many typical use cases
where Coherence is used as a Key-Value data store.
- Features and Benefits
In addition to the basic CRUD (create, read, update, delete) functionality, the Repository API provides many features that simplify the common data management tasks. - Implementing a Repository
Coherence provides an abstract base classcom.oracle.coherence.repository.AbstractRepository
, which your custom repository implementation needs to extend and provide implementation of three abstract methods. - Performing the Basic CRUD Operations
You can use the basic operations to add, remove, update, and query the repository - Performing Server-Side Projections
While querying a repository for a collection of entities that satisfy some criteria is certainly a common and useful operation, sometimes you do not need all the attributes within the entity. - Making In-Place Updates
The most common approach for updating data in modern applications is the read-modify-write pattern. - Using the Stream API and the Data Aggregation API
You can query the repository to retrieve a subset of entities using agetAll
method and aFilter
, but sometimes you do not need the entities themselves, but a result of some computation applied to a subset of entities in the repository. - Creating Event Listeners
Coherence not only allows you to store, modify, query, and aggregate your data entities efficiently, but also register to receive event notifications whenever any entity in the repository changes. - Using the Asynchronous Repository API
In addition to the synchronous repository,AbstractRepository<ID, T>
, there is an asynchronous version:AbstractAsyncRepository<ID, T>
.
Parent topic: Performing Data Grid Operations
Features and Benefits
In addition to the basic CRUD (create, read, update, delete) functionality, the Repository API provides many features that simplify the common data management tasks.
- Powerful projection features
- Flexible in-place entity updates
- First-class data aggregation support
- Stream API support
- Asynchronous API support
- Event listener support
- Declarative Acceleration and Index Creation
- CDI (Contexts and Dependency Injection) support
Parent topic: Using the Repository API
Implementing a Repository
Coherence provides an abstract base class
com.oracle.coherence.repository.AbstractRepository
, which your custom
repository implementation needs to extend and provide implementation of three abstract
methods.
/**
* Return the identifier of the specified entity instance.
*
* @param entity the entity to get the identifier from
*
* @return the identifier of the specified entity instance
*/
protected abstract ID getId(T entity);
/**
* Return the type of entities in this repository.
*
* @return the type of entities in this repository
*/
protected abstract Class<? extends T> getEntityType();
/**
* Return the map that is used as the underlying entity store.
*
* @return the map that is used as the underlying entity store
*/
protected abstract M getMap();
Person
entities, with String
identifiers, can be
as simple
as:public class PeopleRepository
extends AbstractRepository<String, Person>
{
private NamedMap<String, Person> people;
public PeopleRepository(NamedMap<String, Person> people)
{
this.people = people;
}
protected NamedMap<String, Person> getMap()
{
return people;
}
protected String getId(Person person)
{
return person.getSsn();
}
protected Class<? extends Person> getEntityType()
{
return Person.class;
}
}
- The
getMap
method returns theNamedMap
that should be used as a backing data store for the repository, which is in this case provided using the constructor argument. However, it can just as easily be injected using CDI (Contexts and Dependency Injection). - The
getId
method returns an identifier for a given entity. - The
getEntityType
method returns the class of the entities stored in the repository.
A trivial repository implementation as the above example allows you to
access all the Repository API features, which are provided by the
AbstractRepository
class you extended.
public Collection<Person> findByName(String name)
{
Filter<Person> filter = Filters.like(Person::getFirstName, name)
.or(Filters.like(Person::getLastName, name));
return getAll(filter);
}
findByName
method directly within
the application to find all the people whose first or the last name starts with the
letter 'A', for
example:for (Person p : people.findByName("A%"))
{
// processing
}
Parent topic: Using the Repository API
Performing the Basic CRUD Operations
You can use the basic operations to add, remove, update, and query the repository
To add new entities to the repository, or replace the existing ones, you can use
either the save
or the saveAll
method.
save
method takes a single entity as an argument and stores it in
the backing
NamedMap
:people.save(new Person("555-11-2222", "Aleks", 46));
saveAll
method allows you to store a batch of entities
by passing either a collection or a stream of entities as an argument. After you have
some entities stored in a repository, you can query the repository using the
get
and getAll
methods.// gets a single person by identifier
Person person = people.get("555-11-2222");
assert person.getName().equals("Aleks");
assert person.getAge() == 46;
// fetches all the people from the repository
Collection<Person> allPeople = people.getAll();
// fetches all the people from the repository, that are aged 18 years or above.
Collection<Person> allAdults = people.getAll(Filters.greaterOrEqual(Person::getAge, 18));
You can retrieve sorted results by calling getAllOrderedBy
method and
specifying a Comparable
property by using a method reference. The
result of the following example will contain the names of all the people from the
repository, sorted by age from the youngest to the oldest:
Collection<Person> peopleOrderedByAge = people.getAllOrderedBy(Person::getAge)
Comparator
to use instead.
For example, if you want to sort the results of the findByName
method
used earlier (see Implementing a Repository), first by last name, and then by first name, you can re-implement it as
follows:public Collection<Person> findByName(String name)
{
Filter<Person> filter = Filters.like(Person::getFirstName, name)
.or(Filters.like(Person::getLastName, name));
return getAllOrderedBy(filter,
Remote.comparator(Person::getLastName)
.thenComparing(Person::getFirstName));
This example uses the Coherence Remote.comparator
instead
of the standard Java Comparator
to ensure that the specified comparator
is serializable and can be sent to the remote cluster members.
remove
methods:// removes specified entity from the repository
boolean fRemoved = people.remove(person);
// removes entity with the specified identifier from the repository
boolean fRemoved = people.removeById("111-22-3333");
In both examples above, the result will be a boolean indicating whether the entity was
actually removed from the backing NamedMap
, and it may be
false
if the entity was not present in the repository.
// removes specified entity from the repository and returns it as the result
Person removed = people.remove(person, true);
// removes entity with the specified identifier from the repository and returns it as the result
Person removed = people.removeById("111-22-3333", true);
Note:
This removal results in additional network traffic. Therefore, unless you really need the removed entity, it is probably best not to ask for it.removeAll
methods instead.
These methods enable you to remove a set of entities by specifying either their
identifiers explicitly or the criteria for removal through the Filter
.
// removes all men from the repository and returns 'true' if any entity has been removed
boolean fChanged = people.removeAll(Filters.equal(Person::getGender, Gender.MALE));
// removes entities with the specified identifiers from the repository and returns
// 'true' if any entity has been removed
boolean fChanged = people.removeAllById(Set.of("111-22-3333", "222-33-4444"));
// removes the names of all men from the repository and returns the map of removed
// entities, keyed by the identifier
Map<String, Person> mapRemoved =
people.removeAll(Filters.equal(Person::getGender, Gender.MALE), true);
// removes the entities with the specified identifiers from the repository and returns
// the map of removed entities, keyed by the identifier
Map<String, Person> mapRemoved =
people.removeAllById(Set.of("111-22-3333", "222-33-4444"), true);
Parent topic: Using the Repository API
Performing Server-Side Projections
Person
instance is unnecessary and waste of time.
SELECT * FROM PEOPLE
SELECT name FROM PEOPLE
// returns the name of the person with a specified identifier
String name = people.get("111-22-3333", Person::getName);
// returns the map of names of all the people younger than 18, keyed by the person’s identifier
Map<String, String> mapNames =
people.getAll(Filters.less(Person::getAge, 18), Person::getName);
// returns a fragment containing the name and age of the person with a specified identifier
Fragment<Person> fragment = people.get("111-22-3333",
Extractors.fragment(Person::getName, Person::getAge));
// retrieves the person’s name from a fragment
String name = fragment.get(Person::getName);
// retrieves the person’s age from a fragment
int age = fragment.get(Person::getAge);
getAll
methods://returns a map of fragments containing the name and age of all the people younger than 18, keyed by the person’s identifier
Map<String, Fragment<Person>> fragments = people.getAll(
Filters.less(Person::getAge, 18),
Extractors.fragment(Person::getName, Person::getAge));
Person
class may
have a nested Address
object as an attribute, which in turn
has street
, city
, and
country
attributes. If you want to retrieve the
name and the country of a person in a repository, see the following
example:// returns a fragment containing the name and the 'Address' fragment of the person with a specified identifier
Fragment<Person> person = people.get(
"111-22-3333",
Extractors.fragment(Person::getName,
Extractors.fragment(Person::getAddress, Address::getCountry)));
// retrieves the person’s name from the 'Person' fragment
String name = person.get(Person::getName);
// retrievea the 'Address' fragment from the 'Person' fragment
Fragment<Address> address = person.getFragment(Person::getAddress);
// retrieves the person’s country from the 'Address' fragment
String country = address.get(Address::getCountry);
Parent topic: Using the Repository API
Making In-Place Updates
Person
looks similar to the following:
Person person = people.get("111-22-3333");
person.setAge(55);
people.save(person);
This is true regardless of whether the underlying data store provides a better, more efficient way of updating data. For example, RDBMS provides stored procedures for that purpose, but very few developers use them because they are not as convenient to use, and do not fit well into popular application frameworks, such as JPA, Spring Data, or Micronaut Data. They also fragment the code base to some extent, splitting the business logic across the application and the data store, and require that some application code is written in SQL.
- It doubles the number of network calls the application makes to the data store, increasing the overall latency of the operation.
- It moves (potentially a lot) more data over the network than absolutely necessary.
- It may require expensive construction of a complex entity in order to perform a very simple update operation of a single attribute (this is particularly true with JPA and RDBMS back ends).
- It puts additional, unnecessary load on the data store, which is typically the hardest component of the application to scale.
- It introduces concurrency issues (that is, what should happen if the entity in the data store changes between the initial read and subsequent write), which typically requires that both the read and the write happen within the same transaction.
A much better, more efficient way to perform the updates is to send the update function to the data store, and execute it locally, within the data store itself (which is pretty much what stored procedures are for).
people.update("111-22-3333", Person::setAge, 55);
The above code is telling Coherence to update the Person
instance with a
given identifier by calling the setAge
method on it with the number
55 as an argument. This code is not only significantly more
efficient, but also shorter and easier to write and read.
Note that it is not important to know where in the cluster a Person
instance with a given identifier is located. Coherence guarantees that it will invoke
the setAge
method on the entity with a specified ID, on a primary
owner, and automatically create a backup of the modified entity for fault tolerance.
It is also worth pointing out that the approach above provides the same benefits stored procedures do in RDBMS, but without the downsides, that is, you are still writing all your code in Java, and keeping it in the same place. This approach allows you to implement rich domain models for your data, and execute business logic on your entities remotely, which works exceptionally well with DDD applications.
void
, but you often
want to know what the entity value is after the update. The solution to that problem is
simple. Coherence will return the result of the specified method invocation, so all you
need to do is change the setAge
method to implement a fluent
API:public Person setAge(int age)
{
this.age = age;
return this;
}
Person
instance as the result of the
update
call:
Person person = people.update("111-22-3333", Person::setAge, 55);
assert person.getAge() == 55;
update
overload that allows you to specify the function to
execute in that
situation:Person person = people.update("111-22-3333", p ->
{
p.setAge(55);
p.setGender(Gender.MALE);
return p;
});
assert person.getAge() == 55;
assert person.getGender() == Gender.MALE;
This way you have full control of the update logic that will be executed, and the return value.
Cart
for a
given customer exists, and create a new one if the cart does not exist, it results in
network calls that can be avoided if you simply create the Cart
instance as part of the Cart::addItem
call. The Repository API allows
you to accomplish that by using the optional EntityFactory
argument:
carts.update(customerId, // the cart/customer identifier
Cart::addItem, // the method to invoke on a target 'Cart' instance
item, // the 'CartItem' to add to the cart
Cart::new // the 'EntityFactory' to use to create a new 'Cart' instance if the
// cart with the specified identifier does not exist
);
EntityFactory
interface is
simple:@FunctionalInterface
public interface EntityFactory<ID, T>
extends Serializable
{
/**
* Create an entity instance with the specified identity.
*
* @param id identifier to create entity instance with
*
* @return a created entity instance
*/
T create(ID id);
}
create
method that accepts entity identifier and returns
a new instance of the entity with a given identifier. In the example above, it implies
that the Cart
class has a constructor similar to the
following:public Cart(Long cartId)
{
this.cartId = cartId;
}
update
methods that you can use to modify a single entity, there
are also a number of updateAll
methods that you can use to modify
multiple entities in a single call. An example where this may be useful is when you want
to apply the same exact function to multiple entities, as is the case when performing a
stock
split:positions.updateAll(
Filters.equal(Position::getSymbol, "AAPL"),
Position::split, 5);
- The first argument is the
Filter
used to determine the set of positions to update. - The second argument is the function to apply to each position; in
this case
split(5)
will be called on eachPosition
entity with theAAPL
symbol.
As with single-entity updates, the result of each function invocation will be returned to
the client, this time in the form of a Map
containing the identifiers
of the processed entities as keys, and the result of the function applied to that entity
as the value.
Parent topic: Using the Repository API
Using the Stream API and the Data Aggregation API
getAll
method and a
Filter
, but sometimes you do not need the entities
themselves, but a result of some computation applied to a subset of entities
in the repository.For example, you may need to calculate the
average salary of all the employees in a department, or the total value of all
equity positions in a portfolio.
While you could certainly query the repository for the entities that need to be processed and perform processing itself on the client, this is a very inefficient way to accomplish the task because you may end up moving significant amount of data over the network, just to discard it after the client-side processing.
Coherence provides a number of features that allow you to perform various types of distributed processing efficiently. Just as the in-place updates leverage Coherence Entry Processor API to perform data mutation on cluster members that store the data, Repository API's support for data aggregation leverages Coherence Remote Stream API and the Aggregation API to perform read-only distributed computations efficiently. This feature allows you to move processing to the data, instead of the other way around, and to perform computation in parallel across as many CPU cores as your cluster has, instead of a handful of (or in many cases only one) cores on the client.
double avgSalary = employees.stream()
.collect(RemoteCollectors.averagingDouble(Employee::getSalary));
double avgSalary = employees.stream()
.filter(e -> e.getDepartmentId == departmentId)
.collect(RemoteCollectors.averagingDouble(Employee::getSalary));
However, while it works, the code above is not ideal because it will process and potentially deserialize the names of all the employees in the repository to determine whether they belong to a specified department.
stream
method overload which allows you to specify
the Filter
to create a stream based
on:double avgSalary = employees.stream(Filters.equal(Employee::getDepartmentId, departmentId))
.collect(RemoteCollectors.averagingDouble(Employee::getSalary));
The difference is subtle, but important. Unlike previous example, this example allows Coherence to perform query before creating the stream, and leverage any indexes you may have in the process. This can significantly reduce the overhead when dealing with large data sets.
double avgSalary = employees.average(Employee::getSalary);
double avgSalary = employees.average(
Filters.equal(Employee::getDepartmentId, departmentId),
Employee::getSalary);
These are the examples of using the repository aggregation methods directly,
which make the common tasks such as finding min
,
max
, average
, and
sum
of any entity attribute simple.
groupBy
and
top
:Map<Gender, Set<Person>> peopleByGender = people.groupBy(Person::getGender);
Map<Long, Double> avgSalaryByDept =
employees.groupBy(Employee::getDepartmentId, averagingDouble(Employee::getSalary));
List<Double> top5salaries = employees.top(Employee::getSalary, 5);
The simpler ones are count
and distinct
.
min
,
max
, or top
values of an
attribute, but also want the entities to which these values belong. For such
situations, you can use the minBy
, maxBy
,
and topBy
methods, which return the entities containing the
minimum, maximum and top values of an attribute,
respectively:Optional<Person> oldestPerson = people.maxBy(Person::getAge);
Optional<Person> youngestPerson = people.minBy(Person::getAge);
List<Employee> highestPaidEmployees = employees.topBy(Employee::getSalary, 5);
Using Declarative Acceleration and Index Creation
Coherence uses indexes to optimize queries and aggregations. The indexes allow you to
avoid deserializing entities stored across the cluster, which is an expensive operation
when you have large data sets with complex entity classes. The indexes themselves can
also be sorted, which is helpful when executing range-based queries, such as
less
, greater
, or between
.
The standard way to create indexes is by calling the
NamedMap.addIndex
method. However, the Repository API introduces a
simpler, declarative way of index creation.
@Indexed
annotation:public class Person
{
//defines an unordered index on 'Person::getName', which is suitable for filters such as 'equal', 'like', and 'regex'
@Indexed
public String getName()
{
return name;
}
//defines an ordered index on 'Person::getAge', which is better suited for filters such as 'less', 'greater', and 'between'
@Indexed(ordered = true)
public int getAge()
{
return age;
}
}
When the repository is created, it will introspect the entity class for the
@Indexed
annotation and automatically create an index for each
attribute that has this annotation. The created index will then be used whenever that
attribute is referenced within the query expression.
In some cases, you may want to keep the deserialized entity instances instead of discarding them. Retaining the instances can be beneficial if you make frequent queries, aggregations, use the Stream API, or make in-place updates or projections because the cost of maintaining individual indexes on all the attributes may be greater than to retain the deserialized entity instances.
DeserializationAccelerator
. However, if you are use the Repository
API, you have an easier way of configuring. Annotate either the entity class, or the
repository class itself with the @Accelerated
annotation:@Accelerated
public class Person
{
}
You will require additional storage capacity in the cluster to store both the serialized and deserialized copy of all the entities, but in some situations the performance benefits can significantly outweigh the cost. In other words, acceleration is a classic example of a time–space tradeoff, and it is entirely up to you to decide when it makes sense to use it.
Parent topic: Using the Stream API and the Data Aggregation API
Creating Event Listeners
Coherence not only allows you to store, modify, query, and aggregate your data entities efficiently, but also register to receive event notifications whenever any entity in the repository changes.
public static class PeopleListener
implements PeopleRepository.Listener<Person>
{
public void onInserted(Person personNew)
{
// handle INSERT event
}
public void onUpdated(Person personOld, Person personNew)
{
// handle UPDATE event
}
public void onRemoved(Person personOld)
{
// handle REMOVE event
}
}
people.addListener(new PeopleListener());
people.addListener("111-22-3333", new PeopleListener());
people.addListener(Filters.greater(Person::getAge, 17), new PeopleListener());
people.addListener
registers a listener that will be notified whenever any entity in the repository is inserted, updated, or removed.- The second line registers a listener that will be notified when an entity with the specified identifier is inserted, updated, or removed.
- The third line registers a listener that will be notified when any
Person
older than 17 is inserted, updated, or removed.
As shown in the example above, there are several ways to register only for the events you are interested in, to reduce the number of events received and the amount of data sent over the network.
Note:
All of the listener methods used in the above example have a default no-op implementation. Therefore, you only need to implement the ones you actually want to handle.people.addListener(
people.listener()
.onInsert(personNew -> { /* handle INSERT event */ })
.onUpdate((personOld, personNew) -> { /* handle UPDATE event with old value */ })
.onUpdate(personNew -> { /* handle UPDATE event without old value */ })
.onRemove(personOld -> { /* handle REMOVE event */ })
.build()
);
Note:
When using the Listener Builder API, you have the option of omitting the old entity value from theonUpdate
event handler arguments list. You can also
specify multiple handlers for the same event type, in which case they will be composed
and invoked in the specified order.
people.addListener(
people.listener()
.onEvent(person -> { /* handle all events */ })
.build()
);
Just as when implementing the listener class explicitly, you can pass the entity
identifier or a Filter as the first argument to the addListener
method
to limit the scope of the events received.
Parent topic: Using the Repository API
Using the Asynchronous Repository API
AbstractRepository<ID, T>
, there is an asynchronous version:
AbstractAsyncRepository<ID, T>
. The same abstract
methods as described in Implementing a Repository, should be implemented.
The main differences between the two APIs is that the asynchronous API returns
java.util.CompletableFuture
of the return type. For example,
Collection<T> getAll()
in the blocking version would be
CompletableFuture<Collection<T>>
in the asynchronous version
of the Repository API.
The asynchronous API also offers call backs to which the results of the operation will be passed, as they become available, instead of buffering the result into a collection prior to returning. This feature enables you to stream and process very large result sets without paying the cost of accumulating all the results in memory, which is not possible with the blocking API.
Example 40-1 AbstractAsyncRepository Examples
public class AsyncPeopleRepository
extends AbstractAsyncRepository<String, Person>
{
private AsyncNamedMap<String, Person> people;
public AsyncPeopleRepository(AsyncNamedMap<String, Person> people)
{
this.people = people;
}
protected AsyncNamedMap<String, Person> getMap()
{
return people;
}
protected String getId(Person entity)
{
return entity.getSsn();
}
protected Class<? extends Person> getEntityType()
{
return Person.class;
}
}
- The
getMap
method returnsAsyncNamedMap
that should be used as a backing data store for the repository, which is in this case, provided through the constructor argument. However, it can also be easily be injected through CDI (Contexts and Dependency Injection). - The
getId
method returns an identifier for a given entity. - The
getEntityType
method returns the class of the entities stored in the repository.
AsyncPersonRepository
to make
a simple query for an
entity:String upercaseName = asyncPeople.get("111-22-3333")
.thenApply(Person::getName)
.thenApply(String::toUpperCase)
.get()
- The first line gets a
CompletableFuture<Person
based on the ID. - When the future is completed, the second line obtains the person’s name from
the
Person
instance. - The third line converts the name to uppercase.
- The third line blocks and returns the upper-cased name.
This usage pattern will be similar across all the methods that return
CompletableFuture
.
Parent topic: Using the Repository API
Using Asynchronous Callbacks
Instead of dealing with an entire collection being realized for the results, it is
possible to define a callback that will be invoked as results become available. These
APIs will return CompletableFuture<Void>
to signal that all the
results have been processed.
asyncPeople.getAll(person -> System.out.println(person.getName()))
.thenApply(done -> System.out.println("DONE!"))
- The first line of code prints the name of each
Person
within the repository. - The second line prints
DONE!
when the names of all the people have been processed.
ValueExtractor
for it as the first argument. In this case, to move
less data over the network, you can rewrite the code in the above example as
follows:asyncPeople.getAll(Person::getName, (id, name) -> System.out.println(name))
.thenApply(done -> System.out.println("DONE!"))
- The first line of code prints the name of each
Person
within the repository. - The second line prints
DONE!
when the names of all the people have been processed.
In the example above, the callback is implemented as a BiConsumer
that
receives the entity identifier and the extracted value as arguments. You can also use
the fragment extractor as the first argument to the getAll
method
above, in which case the second argument to the callback will be
Fragment<Person>
instead of just the name attribute.
Parent topic: Using the Asynchronous Repository API