Hadoop is an open source technology that provides access to large data sets that are distributed across clusters. One strength of the Hadoop software is that it provides access to large quantities of data not stored in a relational database.
The content in this guide assumes that you are already familiar with, and likely running, a Hadoop system. If you need more information about Hadoop, you might start with the Hadoop project web site at
Hadoop functionality in Oracle Event Processing applications is supported only on Unix-based environments.
This chapter includes the following sections:
You can use the Hadoop data cartridge to integrate an existing Hadoop data source into an event processing network. With the data source integrated, you can write Oracle CQL query code that incorporates data from files on the Hadoop system.
When integrating a Hadoop system, keep the following guidelines in mind:
The Hadoop cluster must have been started through its own mechanism and must be accessible. The cluster is not managed directly by Oracle Event Processing.
A file from a Hadoop system supports only joins using a single key in Oracle CQL. However, any property of the associated event type may be used as key. In other words, with the exception of a key whose type is byte array, you can use keys whose type is other than a String type.
Joins must use the equals operator. Other operators are not supported in a join condition.
For the event type you define to represent data from the Hadoop file, only tuple-based event types are supported.
The order of properties in the event type specification must match the order of fields in the Hadoop file.
Only the following Oracle CQL to Hadoop types are supported. Any other type will cause a configuration exception to be raised.
To understand how a Hadoop data source might be used with an Oracle Event Processing application, consider a scenario with an application that requires quick access to a very large amount of customer purchase data in real time.
In this case, the data stored in Hadoop includes all purchases by all customers from all stores. Values in the data include customer identifiers, store identifiers, product identifiers, and so on. The purchase data includes information about which products are selling best in each store. To render the data to a manageable state, a MapReduce function is used to examine the data and produce a list of top buyers (those to whom incentives will be sent).
This data is collected and managed by a mobile application vendor as part of a service designed to send product recommendations and incentives (including coupons) to customers. The data is collected from multiple retailers and maintained separately for each retailer.
The Oracle Event Processing application provides the middle-tier logic for a client-side mobile application that is designed to offer purchase incentives to top buyers. It works in the following way:
Retailers arrange with the mobile application vendor to provide purchase data as part of a program to offer incentives to top buyers. The data, regularly refreshed from store sales data, is stored in a Hadoop system and a MapReduce function is used to identify top buyers.
The mobile application vendor provides the application for download, noting which retailers support the program.
App users each create a user ID that is correlated by the app vendor to data about customers from the retailers.
The mobile application is designed to send location data to the Oracle Event Processing application, along with the user ID. This information -- location coordinates and user ID -- forms the event data received by the Oracle Event Processing application.
As the Oracle Event Processing application receives event data from the mobile application, it uses Oracle CQL queries to:
Determine whether the user is near a store from a participating retailer.
Establish (from Hadoop-based data) whether the user is a top buyer for the retailer.
Locate purchase information related to that user as a buyer from that retailer.
If the user is a top buyer, the application correlates products previously purchased with incentives currently being offered to buyers of those products.
The Oracle Event Processing application pushes an incentive announcement to the user.
You use the Hadoop support included with Oracle Event Processing by integrating a file in an existing Hadoop system into an event processing network. With the file integrated, you have access to data in the file from Oracle CQL code.
This section describes the following:
In order to use Hadoop from Oracle Event Processing, you must first make configuration changes on both the Oracle Event Processing and Hadoop servers:
On the Oracle Event Processing server, add the following Hadoop configuration files at the server's bootclasspath: core-site.xml, hdfs.xml, and mapred.xml.
To the Hadoop server, copy the Pig JAR file to the lib directory and include it as part of the HADOOP_CLASSPATH defined in the hadoop-env.sh file.
A connection with a Hadoop data source through the cartridge might require many input/output operations, such that undeploying the application can time out or generate errors that prevent the application from being deployed again. Before undeploying an application that uses a Hadoop cartridge, be sure to discontinue event flow into the application.
Integrating a file from an existing Hadoop system is similar to the way you might integrate a table from an existing relational database. For a Hadoop file, you use the
file XML element from the Oracle Event Processing schema specifically added for Hadoop support.
The file element is from the http://www.oracle.com/ns/ocep/hadoop namespace. So your EPN assembly file needs to reference that namespace. The
file element includes the following attributes:
id -- Uniquely identifies the file in the EPN. You will use this attribute's value to reference the data source in a processor.
event-type -- A reference to the event-type to which data from the file should be bound. The event-type must be defined in the EPN.
path -- The path to the file in the Hadoop file system.
separator -- Optional. The character delimiter to use when parsing the lines in the Hadoop file into separate fields. The default delimiter is the comma (',') character.
operation-timeout -- Optional. The maximum amount of time, in milliseconds, to wait for the operation to complete.
With the Hadoop file to integrate specified with the file element, you use the table-source element to add the file as a data source for the Oracle CQL processor in which you will be using the file's data.
In Example 18-1, "EPN Integrating a File from Hadoop", note that the
http://www.oracle.com/ns/ocep/hadoop namespace (and
hadoop prefix) is referenced in the beans element. The
file element references a CustomerDescription.txt file for data, along with a
CustomerDescription event type defined in the event type repository.
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:wlevs="http://www.bea.com/ns/wlevs/spring" xmlns:hadoop="http://www.oracle.com/ns/ocep/hadoop/" xsi:schemaLocation=" http://www.bea.com/ns/wlevs/spring http://www.bea.com/ns/wlevs/spring/spring-wlevs-v11_1_1_6.xsd http://www.oracle.com/ns/ocep/hadoop http://www.oracle.com/ns/ocep/hadoop/ocep-hadoop.xsd"> <!-- Some schema references omitted for brevity. --> <!-- Event types that will be used in the query. --> <wlevs:event-type-repository> <wlevs:event-type type-name="SalesEvent"> <wlevs:class>com.bea.wlevs.example.SalesEvent</wlevs:class> </wlevs:event-type> <wlevs:event-type type-name="CustomerDescription"> <wlevs:properties> <wlevs:property name="userId" type="char"/> <wlevs:property name="creditScore" type="int"/> <wlevs:property name="address" type="char"/> <wlevs:property name="customerName" type="char"/> </wlevs:properties> </wlevs:event-type> </wlevs:event-type-repository> <!-- Input adapter omitted for brevity. --> <!-- Channel sending SalesEvent instances to the processor. --> <wlevs:channel id="S1" event-type="SalesEvent" > <wlevs:listener ref="P1"/> </wlevs:channel> <!-- The file element to integrate CustomerDescription.text file from the Hadoop system into the EPN. --> <hadoop:file id="CustomerDescription" event-type="CustomerDescription" path="CustomerDescription.txt" /> <!-- The file from the Hadoop system tied into the query processor with the table-source element. --> <wlevs:processor id="P1"> <wlevs:table-source ref="CustomerDescription" /> </wlevs:processor> <!-- Other stages omitted for brevity. --> </beans>
After you have integrated a Hadoop file into an event processing network, you can query the file from Oracle CQL code.
Example 18-1, "EPN Integrating a File from Hadoop" illustrates how you can add a file from a Hadoop system into an EPN. With the file added to the EPN, you can query it from Oracle CQL code, as shown in Example 18-2, "Oracle CQL Query Using Data from a Hadoop File".
In the following example, the processor receives SalesEvent instances from a channel, but also has access to a file in the Hadoop system as CustomerDescription instances. The Hadoop file is essentially a CSV file that lists customers. Both event types have a
<n1:config xsi:schemaLocation="http://www.bea.com/ns/wlevs/config/application wlevs_application_config.xsd" xmlns:n1="http://www.bea.com/ns/wlevs/config/application" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <processor> <name>P1</name> <rules> <query id="q1"><![CDATA[ SELECT customerName, creditScore, price, item FROM S1 [Now], CustomerDescription as cust WHERE S1.userId = cust.userId AND S1.price > 1000 ]]></query> </rules> </processor> </n1:config>