A channel represents the logical conduit through which events flow between other types of components (stages). For example, between adapters and Oracle CQL processors or between Oracle CQL processors and event beans.
This chapter includes the following sections:
Channels provide buffering, queuing, and concurrency capabilities that enable you to tune the performance of your application later in the design life cycle. By default, the channel max-threads
attribute is set to 0, which means the channel is in pass-through mode and incurs no performance penalty.
When constructing your EPN, consider the following rules:
A channel is mandatory when you connect an Oracle CQL processor to a downstream stage.
A channel is mandatory when you connect a stream or relation to an Oracle CQL processor.
Note that based on the previous two points, it is mandatory to have a channel between an adapter and a processor. When you use Oracle JDeveloper to connect an adapter to a processor, the channel wizard displays for you to create the channel. See Getting Started with Event Processing for Oracle Stream Explorer.
A channel is optional when you connect any of the following components to an Oracle CQL processor: an external relation, cache, or table source.
A channel is not needed between a pull source, such as a cache or table, and a processor because the pull source represents an external relation. For an external relation, the only valid operation is a join between a stream and a NOW window operator, and hence it is considered a pull source. The join actually happens outside of the Oracle CQL processor. Because it is a pull, the Oracle CQL processor does not need to be aware of its shape (that is, no DDL is required) and does not need the channel to act as intermediary.
In general, use a channel between components when:
Buffering is needed between the emitting component and the receiver.
Queueing or concurrency is needed for the receiving component.
If a custom adapter is used and thread control is necessary.
When you add a channel to your Event Processing Network (EPN), it has a default configuration. The default channel has a name, an ID, is a system time-stamped stream channel, and has a default heartbeat time out of 100 milliseconds or 100,000,000 nanoseconds.
The default configuration is adequate for most applications.You can modify the configuration by editing the application assembly file or by editing the component configuration file.
When a channel is time stamped by the system, Oracle Stream Explorer assigns a new time from the CPU clock when a new event arrives and when the configurable heartbeat time out expires.
When a channel is time stamped by an application, the time stamp of an event is determined by the wlevs:expression
element. A common example of an expression is a reference to a property on the event. If no expression is specified, then the time stamp can be propagated from a prior event. For example, when a channel that is time stamped by the system from one Oracle CQL processor feeds events into a channel that is time stamped by an application of another downstream Oracle CQL processor. In addition, an application can use the StreamSender.sendHeartbeat
method to send an event of type heart-beat
downstream to StreamSink
listeners in the EPN.
Note:
When a channel is both application time stamped and map-based (uses a hash map event type), Oracle Stream Explorer adds a time stamp. A delete or update operation without a key does not work on a channel with this configuration because application time stamped events hold an always changing timestamp
property.
This chapter describes some of the assembly and configuration file channel settings For a complete reference, see Schema Reference for Oracle Stream Explorer.
The assembly file shows the channel settings for the helloworldInputChannel. The settings indicate that helloworldProcessor listens to the channel for events, and that events flow into the channel from helloworldAdapter.
<wlevs:channel id="helloworldInputChannel" event-type="HelloWorldEvent" > <wlevs:listener ref="helloworldProcessor"/> <wlevs:source ref="helloworldAdapter"/> </wlevs:channel>
To configure the channel as a relation, add the is-relation
setting to the assembly file as follows:
<wlevs:channel id="helloworldInputChannel" event-type="HelloWorldEvent" is-relation="true" primary-key="myprimarykey" />
If you make the channel a relation, you must also configure the primary-key
attribute. The primary key is a list of event property names separated by white space or a comma that uniquely identifies each event. See wlevs:metadata
in Schema Reference for Oracle Stream Explorer for information about how to define a primary key.
To configure the channel to be application time stamped, add the application-timestamped
and expression
elements to the assembly file as follows. When you set the is-total-order
element to true
, the application time published is always strictly greater than the last value used.
<wlevs:application-timestamped is-total-order="true"> <wlevs:expression>mytime+10</wlevs:expression> </wlevs:application-timestamped>
The configuration file shows the channel configuration settings. The settings customize the channel to buffer process events asynchronously (max-size
), to use a maximum of 4 threads (max-threads), and to use a heartbeat time out of 10000 nanoseconds (heartbeat).
<channel> <name>helloworldInputChannel</name> <max-size>10000</max-size> <max-threads>4</max-threads> <heartbeat>10000</name> </channel>
If you configure an Oracle CQL processor with more than one query, then by default, all queries send their results to the downstream channel. You can control which queries send their results to the downstream channel with the selector
element.
Figure 5-1 shows an EPN with channel filteredStream
connected to the upstream Oracle CQL processor, filteredFanoutProcessor
.
Figure 5-1 EPN With Oracle CQL Processor and Downstream Channel
The following example shows the queries configured for the Oracle CQL processor.
<processor> <name>filterFanoutProcessor</name> <rules> <query id="Yr3Sector"><![CDATA[ select cusip, bid, srcId, bidQty, ask, askQty, seq from priceStream where sector="3_YEAR" ></query> <query id="Yr2Sector"><![CDATA[ select cusip, bid, srcId, bidQty, ask, askQty, seq from priceStream where sector="2_YEAR" ></query> <query id="Yr1Sector"><![CDATA[ select cusip, bid, srcId, bidQty, ask, askQty, seq from priceStream where sector="1_YEAR" ></query> </rules> </processor>
If you specify more than one query for an Oracle CQL processor, then by default, all query results are output to the Oracle CQL processor outbound channel (filteredStream
in Figure 5-1). Optionally, in the component configuration source, you can use the channel
element selector
child element to specify a space-delimited list of one or more Oracle CQL query names that can output their results to the channel. In the following example, query results for query Yr3Sector
and Yr2Sector
are output to filteredStream
but not query results for query Yr1Sector
.
<channel> <name>filteredStream</name> <selector>Yr3Sector Yr2Sector</selector> </channel>
You can configure a channel
element with a selector
before you create the queries in the upstream Oracle CQL processor. In this case, you must specify query names that match the names in the selector
.
Note:
The selector
child element is only applicable if the upstream stage is an Oracle CQL processor. For more information, see Oracle CQL Processors.
By default, a channel processes events as they arrive. You can configure a channel to batch events that have the same time stamp and were output from the same query by setting the wlevs:channel
attribute batching
to true
. Batching events can improve application performance.
<wlevs:channel id="priceStream" event-type="PriceEvent" batching="true">
<wlevs:listener ref="filterFanoutProcessor" />
<wlevs:source ref="PriceAdapter" />
</wlevs:channel>
For related information, see Implement RelationSender, and batching
in Schema Reference for Oracle Stream Explorer.
You can write code to handle exceptions that occur in stages that are downstream from a channel and thrown to the channel. By default, the fault-handling behavior for a channel is as follows:
If the channel max-threads
setting is 0 (a pass-through channel), then the exception is thrown again to the next upstream stage in the EPN.
If the channel max-threads
setting is greater than 0, then the exception is logged and dropped. any events associated with the fault are also logged and dropped.
You can write a fault handling class and associate the handler with a channel with max-threads
values that are greater than 0. With a fault handler associated with the channel, exceptions thrown to the channel are received by the handler, which contains code to either handle the fault or throw it again. If your fault handling code throws the exception again, the exception is logged, but events related to the exception are lost. If you want to keep track of events involved in these exceptions, you must persist them with your code, such as by writing the event data to a data source connected to your EPN.
Note:
To handle an exception thrown by a multithreaded channel, the fault handler must be registered in a component that is upstream from the channel such as a processor. If you do not register the fault handler with an upstream component, the exception is passed upstream, but the fault handler is not invoked.
For information on writing fault handlers, see Fault Handling.
By default, a channel broadcasts each event to every listener. When you configure a channel to use an EventPartitioner
, each time an incoming event arrives, the channel selects a listener and dispatches the event to that listener instead of broadcasting each event to every listener. You can use an EventPartitioner
on a channel to improve scalability.
For more information, see Customizing Oracle Stream Explorer.
The static relationship of an EPN can be represented as an acyclic directed graph, that is, any pair (N,C), where N is a set of the nodes (vertices), and C is a two-place relation over N representing the connections (arcs) from a source node to a destination node. This is known as a distributed flow.
Distributed Flow refers to a set of events in an application. A distributed flow serves the same purpose as that of variables or parameters in computer programming languages. The distributed flow represents a state that is communicated by a software layer. A distributed flow is dynamic in nature. A distributed flow expresses the high-level logical relationship between parts of distributed protocols.
A few scenarios in which distributed flows can be used are listed below:
Events that are not extremely latency bound
Events that can be logically partitioned
Events that can be logically divided into separate individual components/tasks that can be executed in parallel.
Properties of Distributed Flow
A distributed flow usually has the following properties:
Asynchronous and one-way — each event represents a single instance
Homogeneous and uniform — all events in the flow are uniform and are similar in nature.
Concurrent and distributed — the events in the flow occur at different times (concurrent) and at different nodes (distributed).
A few scenarios in which distributed flow can be used in real-life are listed below:
Word Count
In a scenario where you need to count the number of words, the application maps incoming sentences into meaningful terms and then reduces these terms to a count (per term). With stream processing, you can count a real-time flow of words, such as the one coming stream from Twitter. However, if it is done through stream processing, handling such a high volume of words becomes an issue. The distributed processing subscribes to disparate parallel twitter streams and converges the results (number of words).
Smart Meter Energy Consumption
It is common for households today to collect their energy consumption through the use of smart meters located in their premises. Typically, these smart meters output energy consumption sensor data in the form of events periodically through out the day (e.g. every 1 minute). This sensor data is captured by downstream management systems in regional processing centres and is used to calculate useful descriptive statistics, such as the average energy consumption of a house, or neighborhood, and how it relates to historical data for the region. These running aggregations can be partitioned to efficiently calculate the energy consumption. A distributed flow can be used to identify outliers (for example, households that are above or below the general consumption) and to predict future consumption. This data can be used to control the buying and selling process of energy with their partners.
Risk Analysis
Distributed flow can be used for real-time calculation of exposure of financial portfolio and thus analyze the risks.
Local Partitioning is a technique that helps in partitioning the events within a distributed flow. In a local partition channel, you must use an ordered set of event properties as the partition criteria. The channel partitioning is present in the management console as a part of the channel and the channel configuration. See Figure 5-2.
As local partitioning aims at improving the input throughput, if you want to measure the speed rate, you must measure the events per second that an adapter is able to send to the processor.
Note:
You must choose themax-threads
value appropriately based on the hardware capacity of the machine you are using.Figure 5-2 EPN with Oracle CQL Processor and Downstream Channel
Security
Secure Sockets Layer (SSL) is used to secure network traffic between the distributed EPN nodes.
Configuration
You must configure the Oracle CQL Processor to support local partitioning. See Define a Local Partition Channel and Configure an Oracle CQL Processor for Parallel Query Execution.
Examples
Here are a few examples of a simple EPN:
Example 5-1 EPN Example 1
An event is defined as the relation P of any pair (PN, PV), representing property names and property values. For the purpose of this document, there is no need to define the domain of property names and property values.
EPN1 = ({adapter1, channel1, processor, channel2, adapter2}, {(adapter1,channel1), (channel1, processor), (processor, channel2), (channel2, adapter2)})
Example 5-2 EPN Example 2
As an EPN node may contain more than one event, we define the set E as an ordered sequence of events.
Note:
E is ordered which is different from other cases.The runtime state S = (N,E) of the EPN can thus be presented as a two-place relation from N to E. Note that the relation S is not injective, meaning that the same event(s) may be present in more than one node. However, it is surjective, as all events of the total set of events in the EPN must be in at least one node.
e1 = {(price, 10), (volume, 200), (symbol, 'ORCL')} e2 = {(p1, v1), (p2, v2), (p3, v3)}