5 Oracle Big Data Cartridges

Oracle Event Processing supports Big Data with the Hadoop and NoSQLDB cartridges. Hadoop is a cartridge extension for an Oracle CQL processor to access large quantities of data in a Hadoop distributed file system (HDFS). HDFS is a non-relational data store. NoSQL is a cartridge extension for an Oracle CQL processor to access large quantities of data in an Oracle NoSQL Database. The Oracle NoSQLDB Database stores data in key-value pairs.

This chapter includes the following sections:

5.1 Hadoop Data Cartridge

Hadoop is an open source technology that provides access to large data sets that are distributed across clusters. One strength of the Hadoop software is that it provides access to large quantities of data not stored in a relational database. The Oracle Event Processing 12.1.3 data cartridge is based on the Cloudera distribution for Hadoop (CDH), version 3u5

The content in this guide assumes that you are already familiar with, and likely running, a Hadoop system. If you need more information about Hadoop, start with the Hadoop project web site at http://hadoop.apache.org/.

Note:

You can use the Hadoop data cartridge on UNIX and Windows even through Hadoop itself runs only in the Linux environment.

5.1.1 Understanding the Oracle Event Processing Hadoop Data Cartridge

You can use the Hadoop data cartridge to integrate an existing Hadoop data source into an event processing network that can process data from files on the Hadoop distributed file system. With the data source integrated, you can write Oracle CQL query code that incorporates data from files on the Hadoop system.

When integrating a Hadoop system, keep the following guidelines in mind:

  • The Hadoop cluster must have been started through its own mechanism and must be accessible. The cluster is not managed directly by Oracle Event Processing.

  • A file from a Hadoop system supports only joins using a single key in Oracle CQL. However, any property of the associated event type may be used as key. In other words, with the exception of a key whose type is byte array, you can use keys whose type is other than a String type.

  • Joins must use the equals operator. Other operators are not supported in a join condition.

  • For the event type you define to represent data from the Hadoop file, only tuple-based event types are supported.

  • The order of properties in the event type specification must match the order of fields in the Hadoop file.

  • To avoid throwing a NullPointerExeption, wait for the Hadoop Data Cartridge to finish processing before attempting to shut down the server or undeploy.

  • Only the following Oracle CQL to Hadoop types are supported. Any other type will cause a configuration exception to be raised.


    Table 5-1 Mapping Between Datatypes for Oracle CQL and Hadoop

    Oracle CQL Datatype Hadoop Datatype

    int

    int

    bigint

    long

    float

    float

    double

    double

    char

    chararray

    java.lang.String

    chararray

    byte

    bytearray


5.1.1.1 Usage Scenario: Using Purchase Data to Develop Buying Incentives

To understand how a Hadoop data source might be used with an Oracle Event Processing application, consider a scenario with an application that requires quick access to a very large amount of customer purchase data in real time.

In this case, the data stored in Hadoop includes all purchases by all customers from all stores. Values in the data include customer identifiers, store identifiers, product identifiers, and so on. The purchase data includes information about which products are selling best in each store. To render the data to a manageable state, a MapReduce function is used to examine the data and produce a list of top buyers (those to whom incentives will be sent).

This data is collected and managed by a mobile application vendor as part of a service designed to send product recommendations and incentives (including coupons) to customers. The data is collected from multiple retailers and maintained separately for each retailer.

The Oracle Event Processing application provides the middle-tier logic for a client-side mobile application that is designed to offer purchase incentives to top buyers. It works in the following way:

  1. Retailers arrange with the mobile application vendor to provide purchase data as part of a program to offer incentives to top buyers. The data, regularly refreshed from store sales data, is stored in a Hadoop system and a MapReduce function is used to identify top buyers.
  2. The mobile application vendor provides the application for download, noting which retailers support the program.
  3. App users each create a user ID that is correlated by the app vendor to data about customers from the retailers.
  4. The mobile application is designed to send location data to the Oracle Event Processing application, along with the user ID. This information -- location coordinates and user ID -- forms the event data received by the Oracle Event Processing application.
  5. As the Oracle Event Processing application receives event data from the mobile application, it uses Oracle CQL queries to:
    • Determine whether the user is near a store from a participating retailer.

    • Establish (from Hadoop-based data) whether the user is a top buyer for the retailer.

    • Locate purchase information related to that user as a buyer from that retailer.

    • If the user is a top buyer, the application correlates products previously purchased with incentives currently being offered to buyers of those products.

  6. The Oracle Event Processing application pushes an incentive announcement to the user.

5.1.1.2 Data Cartridge Name

The Oracle Event Processing Hadoop cartridge uses the cartridge ID com.oracle.cep.cartridge.hadoop.

5.1.2 Using Hadoop Data Sources in Oracle CQL

You use the Hadoop support included with Oracle Event Processing by integrating a file in an existing Hadoop system into an event processing network. With the file integrated, you have access to data in the file from Oracle CQL code.

This section describes the following:

5.1.2.1 Configuring Integration of Oracle Event Processing and Hadoop

In order to use Hadoop from Oracle Event Processing, you must first make configuration changes on both the Oracle Event Processing and Hadoop servers:

  • On the Oracle Event Processing server, add the following Hadoop configuration files at the server's bootclasspath: core-site.xml, hdfs.xml, and mapred.xml. See Oracle Fusion Middleware Administering Oracle Event Processing for information about the bootclasspath.

  • To the Hadoop server, copy the Pig JAR file to the lib directory and include it as part of the HADOOP_CLASSPATH defined in the hadoop-env.sh file.

Note:

A connection with a Hadoop data source through the cartridge might require many input/output operations, such that undeploying the application can time out or generate errors that prevent the application from being deployed again. Before undeploying an application that uses a Hadoop cartridge, be sure to discontinue event flow into the application.

5.1.2.2 Integrating a File from a Hadoop System Into an EPN

Integrating a file from an existing Hadoop system is similar to the way you might integrate a table from an existing relational database. For a Hadoop file, you use the file XML element from the Oracle Event Processing schema specifically added for Hadoop support.

The file element is from the http://www.oracle.com/ns/ocep/hadoop namespace. So your EPN assembly file needs to reference that namespace. The file element includes the following attributes:

  • id -- Uniquely identifies the file in the EPN. You will use this attribute's value to reference the data source in a processor.

  • event-type -- A reference to the event-type to which data from the file should be bound. The event-type must be defined in the EPN.

  • path -- The path to the file in the Hadoop file system.

  • separator -- Optional. The character delimiter to use when parsing the lines in the Hadoop file into separate fields. The default delimiter is the comma (',') character.

  • operation-timeout -- Optional. The maximum amount of time, in milliseconds, to wait for the operation to complete.

With the Hadoop file to integrate specified with the file element, you use the table-source element to add the file as a data source for the Oracle CQL processor in which you will be using the file's data.

In the following example, note that the http://www.oracle.com/ns/ocep/hadoop namespace (and hadoop prefix) is referenced in the beans element. The file element references a CustomerDescription.txt file for data, along with a CustomerDescription event type defined in the event type repository.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:wlevs="http://www.bea.com/ns/wlevs/spring"
    xmlns:hadoop="http://www.oracle.com/ns/ocep/hadoop/"
    xsi:schemaLocation="
        http://www.bea.com/ns/wlevs/spring
        http://www.bea.com/ns/wlevs/spring/ocep-epn.xsd
        http://www.oracle.com/ns/ocep/hadoop
        http://www.oracle.com/ns/ocep/hadoop/ocep-hadoop.xsd">
<!-- Some schema references omitted for brevity. -->

    <!-- Event types that will be used in the query. -->
    <wlevs:event-type-repository>
        <wlevs:event-type type-name="SalesEvent">
            <wlevs:class>com.bea.wlevs.example.SalesEvent</wlevs:class>
        </wlevs:event-type>
        <wlevs:event-type type-name="CustomerDescription">
            <wlevs:properties>
                <wlevs:property name="userId" type="char"/>
                <wlevs:property name="creditScore" type="int"/>
                <wlevs:property name="address" type="char"/>
                <wlevs:property name="customerName" type="char"/>
            </wlevs:properties>
        </wlevs:event-type>
    </wlevs:event-type-repository>

    <!-- Input adapter omitted for brevity. -->

    <!-- Channel sending SalesEvent instances to the processor. -->
    <wlevs:channel id="S1" event-type="SalesEvent" >
        <wlevs:listener ref="P1"/>
    </wlevs:channel>

    <!-- The file element to integrate CustomerDescription.txt file from 
        the Hadoop system into the EPN. -->
    <hadoop:file id="CustomerDescription" event-type="CustomerDescription"
        path="CustomerDescription.txt" />

    <!-- The file from the Hadoop system tied into the query processor
        with the table-source element. -->
    <wlevs:processor id="P1">
        <wlevs:table-source ref="CustomerDescription" />
    </wlevs:processor>

    <!-- Other stages omitted for brevity. -->

</beans>

5.1.2.3 Using Hadoop Data in Oracle CQL

After you have integrated a Hadoop file into an event processing network, you can query the file from Oracle CQL code.

The following example illustrates how you can add a file from a Hadoop system into an EPN. With the file added to the EPN, you can query it from Oracle CQL code, as shown in the following example.

In the following example, the processor receives SalesEvent instances from a channel, but also has access to a file in the Hadoop system as CustomerDescription instances. The Hadoop file is essentially a CSV file that lists customers. Both event types have a userId property.

<n1:config
    xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd"
    xmlns:n1="http://www.bea.com/ns/wlevs/config/application"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <processor>
        <name>P1</name>
        <rules>
            <query id="q1"><![CDATA[	
                SELECT 	customerName, creditScore, price, item	
                FROM 	S1 [Now], CustomerDescription as cust
                WHERE S1.userId = cust.userId 
                AND S1.price > 1000
            ></query>
        </rules>
    </processor>
</n1:config>

5.2 NoSQL Data Cartridge

The Oracle NoSQL Database is a distributed key-value database. In it, data is stored as key-value pairs, which are written to particular storage node(s). Storage nodes are replicated to ensure high availability, rapid failover in the event of a node failure and optimal load balancing of queries.

The content in this guide assumes that you are already familiar with, and likely running, an Oracle NoSQL database. If you need more information about Oracle NoSQL, be sure to see its Oracle Technology Network page at http://www.oracle.com/technetwork/database/database-technologies/nosqldb/documentation/index.html.

Note:

To use the NoSQL Data Cartridge, you must have a license for NoSQL Enterprise Edition.

5.2.1 Oracle CQL Processor Queries

You can use the Oracle Event Processing NoSQL Database data cartridge to refer to data stored in Oracle NoSQL Database as part of an Oracle CQL query. The cartridge makes it possible for queries to retrieve values from an Oracle NoSQL Database store by specifying a key in the query and then referring to fields of the value associated with the key.

When integrating an Oracle NoSQL database, keep the following guidelines in mind:

  • The NoSQL database must have been started through its own mechanisms and must be accessible. It is not managed directly by Oracle Event Processing.

  • This release of the cartridge provides access to the database using release 2.1.54 of the Oracle NoSQL Database API.

  • The property used as a key in queries must be of type String. Joins can use a single key only.

  • Joins must use the equals operator. Other operators are not supported in a join condition.

  • Runaway queries that involve the NoSQL database are not supported. A runaway query has an execution time that takes longer than the execution time estimated by the optimizer.

5.2.2 Data Cartridge Name

The Oracle Event Processing NoSQL cartridge uses the cartridge ID com.oracle.cep.cartridge.nosqldb.

5.2.3 Using a NoSQL Database in Oracle CQL

To use the Oracle Event Processing NoSQL Database data cartridge in a CQL application, you must declare and configure it in one or more application-scoped cartridge contexts for the application.

5.2.3.1 Integrating a NoSQL Database Into an EPN

Integrating an existing NoSQL database is similar to the way you might integrate a table from a relational database. For a NoSQL database, you update the EPN assembly file in the following ways (see the example in step 3):

  1. Add namespace declarations to support for the store element for referencing the NoSQL data source.

    Your changes should add a namespace schema location to the schemaLocation attribute, along with a namespace and prefix declaration:

    • http://www.oracle.com/ns/oep/nosqldb http://www.oracle.com/ns/oep/nosqldb/oep-nosqldb.xsd

    • xmlns:nosqldb="http://www.oracle.com/ns/oep/nosqldb/"

  2. Add the store element to integrate the NoSQL database into the event processing network as a relation source.

    The store element supports the following attributes, all of which are required:

    • id -- The name that will be used to refer to the key-value store in CQL queries.

    • store-name -- The name of the key-value store, which should match the name specified in the KVStoreConfig class when creating the store.

    • store-locations -- One or more host names and ports of active nodes in the store. The attribute value is a space-separated list in which each entry is formatted as "hostname:port". Nodes with the specified host name and port values will be contacted in order when connecting to the store initially.

    • event-type -- The object type for all objects retrieved for this relation from values in the store. The attribute value should correspond to the name of a wlevs:event-type entry specified in a wlevs:event-type-repository entry.

  3. Add a table-source element to connect the NoSQL database to the processor in which queries will be executed.

The following example illustrates how you can connect an event processing network to a NoSQL database. The store element provides access to a store named "kvstore-customers", using port 5000 on host kvhost-alpha or port 5010 on host kvhost-beta to make the initial connection. It defines Oracle CQL processor P1 and makes the data in the key-value store available to it as a relation named "CustomerDescription".

The store can be referred to within Oracle CQL queries using the name "CustomerDescription". All values retrieved from the store should be serialized instances of the CustomerDescription class.

   <?xml version="1.0" encoding="UTF-8"?>
   <beans xmlns="http://www.springframework.org/schema/beans"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xmlns:osgi="http://www.springframework.org/schema/osgi"
          xmlns:wlevs="http://www.bea.com/ns/wlevs/spring"
          xmlns:nosqldb="http://www.oracle.com/ns/oep/nosqldb/"
          xsi:schemaLocation="
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans.xsd
     http://www.springframework.org/schema/osgi
     http://www.springframework.org/schema/osgi/spring-osgi.xsd
     http://www.bea.com/ns/wlevs/spring
     http://www.bea.com/ns/wlevs/spring/ocep-epn.xsd
     http://www.oracle.com/ns/oep/nosqldb
     http://www.oracle.com/ns/oep/nosqldb/oep-nosqldb.xsd">

    <!-- Provide access to the CustomerDescription class, which represents 
        the type of values in the store. -->
    <wlevs:event-type-repository>
        <wlevs:event-type type-name="CustomerDescription">
            <wlevs:class>com.bea.wlevs.example.CustomerDescription</wlevs:class>
        </wlevs:event-type>
        <wlevs:event-type type-name="SalesEvent">
            <wlevs:class>com.bea.wlevs.example.SalesEvent</wlevs:class>
        </wlevs:event-type>
    </wlevs:event-type-repository>

    <!-- The store element declares the key-value store, along with the 
        event type to which incoming NoSQL data will be bound. -->
    <nosqldb:store store-name="kvstore-customers"
        store-locations="kvhost-alpha:5000 kvhost-beta:5010"
        id="CustomerDescription"
        event-type="CustomerDescription"/>


    <wlevs:channel id="S1" event-type="SalesEvent">
        <wlevs:listener ref="P1"/>
    </wlevs:channel>

    <!- The table-source element links the store to the CQL processor. -->
    <wlevs:processor id="P1">
        <wlevs:table-source ref="CustomerDescription" /> 
    </wlevs:processor>

</beans>

If Oracle CQL queries refer to entries in a store specified by a store element, then the values of those entries must be serialized instances of the type specified by the event-type attribute. The event type class must implement java.io.Serializable.

If a query retrieves a value from the store that is not a valid serialized form, or if the value is not the serialized form for the specified class, then Oracle Event Processing throws an exception and event processing is halted. You can declare multiple store elements to return values of different types from the same or different stores.

5.2.3.2 Using NoSQL Data in Oracle CQL

After you have integrated a NoSQL database into an event processing network, you can access data from Oracle CQL code. The query can look up an entry from the store by specifying an equality relation in the query's WHERE clause.

<n1:config
    xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd"
    xmlns:n1="http://www.bea.com/ns/wlevs/config/application"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <processor>
        <name>P1</name>
        <rules>
            <query id="q1"><![CDATA[	
                SELECT customerName, creditScore, price, item
                FROM S1 [Now], CustomerDescription as cust
                WHERE S1.userId = cust.userId
                AND creditScore > 5
            ></query>
        </rules>
    </processor>
</n1:config>

In this example, the event type instances representing data from the S1 channel and CustomerDescription NoSQL data source are both implemented as JavaBeans classes. Because both event types are JavaBeans classes, the Oracle CQL query can access the customer description associated with a particular event by equating the event's user ID with that of the customer description in the WHERE clause, treating both as JavaBeans properties:

WHERE S1.userId = CustomerDescription.userId

This clause requests that an entry be retrieved from the store that has the key specified by the value of the event's userId field. Only equality relations are supported for obtaining entries from the store.

Once an entry from the store has been selected, fields from the value retrieved from the store can be referred to in the SELECT portion of the query or in additional clauses in the WHERE clause.

The creditScore value specified in the SELECT clause will include the value of the creditScore field of the CustomerDescription object retrieved from the store in the query output. The reference to creditScore in the WHERE clause will also further restrict the query to events where the value of the CustomerDescription creditScore field is greater than 5.

5.2.3.2.1 Formatting the Key Used to Obtain Entries from the NoSQL Store

The key used to obtain entries from the store can be formatted in one of two ways: by beginning the value with a forward slash ('/') or by omitting a slash.

If the value specified on the left hand side of the equality relation starts with a forward slash, then the key is treated as a full key path that specifies one or more major components, as well as minor components if desired. For more details on the syntax of key paths, see the information about the oracle.kv.Key class in the Oracle NoSQL Database API documentation at http://docs.oracle.com/cd/NOSQL/html/javadoc/index.html.

For example, if the userId field of a SalesEvent object has the value "/users/user42/-/custDesc", then that value will be treated as a full key path that specifies "users" as the first major component, the user ID "user42" as the second major component, and a minor component named "custDesc".

As a convenience, if the value specified on the left hand side of the equality relation does not start with a forward slash, then it is treated as a single major component that comprises the entire key.

Note that keys used to retrieve entries from the store must be specified in full by a single field accessed by the Oracle CQL query. In particular, if a key path with multiple components is required to access entries in the key-value store, then the full key path expression must be stored in a single field that is accessed by the query.