5 Channels

A channel represents the logical conduit through which events flow between other types of components (stages). For example, between adapters and Oracle CQL processors or between Oracle CQL processors and event beans.

This chapter includes the following sections:

5.1 When to Use a Channel

Channels provide buffering, queuing, and concurrency capabilities that enable you to tune the performance of your application later in the design life cycle.

By default, the channel max-threads attribute is set to 0, which means the channel is in pass-through mode and incurs no performance penalty.

When constructing your EPN, consider the following rules:

  • A channel is mandatory when you connect an Oracle CQL processor to a downstream stage.

  • A channel is mandatory when you connect a stream or relation to an Oracle CQL processor.

    Note that based on the previous two points, it is mandatory to have a channel between an adapter and a processor. When you use Oracle JDeveloper to connect an adapter to a processor, the channel wizard displays for you to create the channel.

  • A channel is optional when you connect any of the following components to an Oracle CQL processor: an external relation, cache, or table source.

A channel is not needed between a pull source, such as a cache or table, and a processor because the pull source represents an external relation. For an external relation, the only valid operation is a join between a stream and a NOW window operator, and hence it is considered a pull source. The join actually happens outside of the Oracle CQL processor. Because it is a pull, the Oracle CQL processor does not need to be aware of its shape (that is, no DDL is required) and does not need the channel to act as intermediary.

In general, use a channel between components when:

  • Buffering is needed between the emitting component and the receiver.

  • Queueing or concurrency is needed for the receiving component.

  • If a custom adapter is used and thread control is necessary.

5.2 Channel Configuration

When you add a channel to your Event Processing Network (EPN), it has a default configuration. The default channel has a name, an ID, is a system time-stamped stream channel, and has a default heartbeat time out of 100 milliseconds or 100,000,000 nanoseconds.

The default configuration is adequate for most applications.You can modify the configuration by editing the application assembly file or by editing the component configuration file.

When a channel is time stamped by the system, Oracle Stream Analytics assigns a new time from the CPU clock when a new event arrives and when the configurable heartbeat time out expires.

When a channel is time stamped by an application, the time stamp of an event is determined by the wlevs:expression element. A common example of an expression is a reference to a property on the event. If no expression is specified, then the time stamp can be propagated from a prior event. For example, when a channel that is time stamped by the system from one Oracle CQL processor feeds events into a channel that is time stamped by an application of another downstream Oracle CQL processor. In addition, an application can use the StreamSender.sendHeartbeat method to send an event of type heart-beat downstream to StreamSink listeners in the EPN.

Note:

When a channel is both application time stamped and map-based (uses a hash map event type), Oracle Stream Analytics adds a time stamp. A delete or update operation without a key does not work on a channel with this configuration because application time stamped events hold an always changing timestamp property.

This chapter describes some of the assembly and configuration file channel settings.

5.2.1 Assembly File

The assembly file shows the channel settings for the helloworldInputChannel. The settings indicate that helloworldProcessor listens to the channel for events, and that events flow into the channel from helloworldAdapter.

<wlevs:channel id="helloworldInputChannel" event-type="HelloWorldEvent" >
   <wlevs:listener ref="helloworldProcessor"/>
   <wlevs:source ref="helloworldAdapter"/>
</wlevs:channel>

To configure the channel as a relation, add the is-relation setting to the assembly file as follows:

<wlevs:channel id="helloworldInputChannel" event-type="HelloWorldEvent"
is-relation="true" primary-key="myprimarykey" />

If you make the channel a relation, you must also configure the primary-key attribute. The primary key is a list of event property names separated by white space or a comma that uniquely identifies each event.

To configure the channel to be application time stamped, add the application-timestamped and expression elements to the assembly file as follows. When you set the is-total-order element to true, the application time published is always strictly greater than the last value used.

<wlevs:application-timestamped is-total-order="true">
   <wlevs:expression>mytime+10</wlevs:expression>
</wlevs:application-timestamped>

5.2.2 Configuration File

The configuration file shows the channel configuration settings. The settings customize the channel to buffer process events asynchronously (max-size), to use a maximum of 4 threads (max-threads), and to use a heartbeat time out of 10000 nanoseconds (heartbeat).

  <channel>
    <name>helloworldInputChannel</name>
    <max-size>10000</max-size>
    <max-threads>4</max-threads>
    <heartbeat>10000</name>
</channel>

5.3 Control Which Queries Output to a Downstream Channel

If you configure an Oracle CQL processor with more than one query, then by default, all queries send their results to the downstream channel. You can control which queries send their results to the downstream channel with the selector element.

Figure 5-1 shows an EPN with channel filteredStream connected to the upstream Oracle CQL processor, filteredFanoutProcessor.

Figure 5-1 EPN With Oracle CQL Processor and Downstream Channel

Description of Figure 5-1 follows
Description of "Figure 5-1 EPN With Oracle CQL Processor and Downstream Channel"

The following example shows the queries configured for the Oracle CQL processor.

<processor>
    <name>filterFanoutProcessor</name>
    <rules>
        <query id="Yr3Sector"><![CDATA[ 
            select cusip, bid, srcId, bidQty, ask, askQty, seq 
            from priceStream where sector="3_YEAR"
        ></query>
        <query id="Yr2Sector"><![CDATA[ 
            select cusip, bid, srcId, bidQty, ask, askQty, seq 
            from priceStream where sector="2_YEAR"
        ></query>
        <query id="Yr1Sector"><![CDATA[ 
            select cusip, bid, srcId, bidQty, ask, askQty, seq 
            from priceStream where sector="1_YEAR"
        ></query>
    </rules>
</processor>

If you specify more than one query for an Oracle CQL processor, then by default, all query results are output to the Oracle CQL processor outbound channel (filteredStream in Figure 5-1). Optionally, in the component configuration source, you can use the channel element selector child element to specify a space-delimited list of one or more Oracle CQL query names that can output their results to the channel. In the following example, query results for query Yr3Sector and Yr2Sector are output to filteredStream but not query results for query Yr1Sector.

<channel>
    <name>filteredStream</name>
    <selector>Yr3Sector Yr2Sector</selector>
</channel>

You can configure a channel element with a selector before you create the queries in the upstream Oracle CQL processor. In this case, you must specify query names that match the names in the selector.

Note:

The selector child element is only applicable if the upstream stage is an Oracle CQL processor. For more information, see Oracle CQL Processors .

5.4 Batch Processing Channels

By default, a channel processes events as they arrive. You can configure a channel to batch events that have the same time stamp and were output from the same query by setting the wlevs:channel attribute batching to true.

Batching events can improve application performance.

<wlevs:channel id="priceStream" event-type="PriceEvent" batching="true">
    <wlevs:listener ref="filterFanoutProcessor" />
    <wlevs:source ref="PriceAdapter" />
</wlevs:channel>

See also Implement RelationSender.

5.5 Fault Handling

You can write code to handle exceptions that occur in stages that are downstream from a channel and thrown to the channel.

By default, the fault-handling behavior for a channel is as follows:

  • If the channel max-threads setting is 0 (a pass-through channel), then the exception is thrown again to the next upstream stage in the EPN.

  • If the channel max-threads setting is greater than 0, then the exception is logged and dropped. any events associated with the fault are also logged and dropped.

You can write a fault handling class and associate the handler with a channel with max-threads values that are greater than 0. With a fault handler associated with the channel, exceptions thrown to the channel are received by the handler, which contains code to either handle the fault or throw it again. If your fault handling code throws the exception again, the exception is logged, but events related to the exception are lost. If you want to keep track of events involved in these exceptions, you must persist them with your code, such as by writing the event data to a data source connected to your EPN.

Note:

To handle an exception thrown by a multithreaded channel, the fault handler must be registered in a component that is upstream from the channel such as a processor. If you do not register the fault handler with an upstream component, the exception is passed upstream, but the fault handler is not invoked.

For information on writing fault handlers, see Fault Handling.

5.6 EventPartitioner Channels

By default, a channel broadcasts each event to every listener.

When you configure a channel to use an EventPartitioner, each time an incoming event arrives, the channel selects a listener and dispatches the event to that listener instead of broadcasting each event to every listener. You can use an EventPartitioner on a channel to improve scalability.

5.7 Distributed Flows

The static relationship of an EPN can be represented as an acyclic directed graph, that is, any pair (N,C), where N is a set of the nodes (vertices), and C is a two-place relation over N representing the connections (arcs) from a source node to a destination node.

This is known as a distributed flow.

Distributed Flow refers to a set of events in an application. A distributed flow serves the same purpose as that of variables or parameters in computer programming languages. The distributed flow represents a state that is communicated by a software layer. A distributed flow is dynamic in nature. A distributed flow expresses the high-level logical relationship between parts of distributed protocols.

A few scenarios in which distributed flows can be used are listed below:

  • Events that are not extremely latency bound

  • Events that can be logically partitioned

  • Events that can be logically divided into separate individual components/tasks that can be executed in parallel.

Properties of Distributed Flow

A distributed flow usually has the following properties:

  1. Asynchronous and one-way — each event represents a single instance

  2. Homogeneous and uniform — all events in the flow are uniform and are similar in nature.

  3. Concurrent and distributed — the events in the flow occur at different times (concurrent) and at different nodes (distributed).

5.7.1 Examples of Distributed Flows

A few scenarios in which distributed flow can be used in real-life are listed below:

Word Count

In a scenario where you need to count the number of words, the application maps incoming sentences into meaningful terms and then reduces these terms to a count (per term). With stream processing, you can count a real-time flow of words, such as the one coming stream from Twitter. However, if it is done through stream processing, handling such a high volume of words becomes an issue. The distributed processing subscribes to disparate parallel twitter streams and converges the results (number of words).

Smart Meter Energy Consumption

It is common for households today to collect their energy consumption through the use of smart meters located in their premises. Typically, these smart meters output energy consumption sensor data in the form of events periodically through out the day (e.g. every 1 minute). This sensor data is captured by downstream management systems in regional processing centres and is used to calculate useful descriptive statistics, such as the average energy consumption of a house, or neighborhood, and how it relates to historical data for the region. These running aggregations can be partitioned to efficiently calculate the energy consumption. A distributed flow can be used to identify outliers (for example, households that are above or below the general consumption) and to predict future consumption. This data can be used to control the buying and selling process of energy with their partners.

Risk Analysis

Distributed flow can be used for real-time calculation of exposure of financial portfolio and thus analyze the risks.

5.7.2 Local Partitioning Channel

Local Partitioning is a technique that helps in partitioning the events within a distributed flow. In a local partition channel, you must use an ordered set of event properties as the partition criteria. The channel partitioning is present in the management console as a part of the channel and the channel configuration. See Figure 5-2.

As local partitioning aims at improving the input throughput, if you want to measure the speed rate, you must measure the events per second that an adapter is able to send to the processor.

Note:

You must choose the max-threads value appropriately based on the hardware capacity of the machine you are using.

Figure 5-2 EPN with Oracle CQL Processor and Downstream Channel

Description of Figure 5-2 follows
Description of "Figure 5-2 EPN with Oracle CQL Processor and Downstream Channel"

Security

Secure Sockets Layer (SSL) is used to secure network traffic between the distributed EPN nodes.

Configuration

You must configure the Oracle CQL Processor to support local partitioning. See Define a Local Partition Channel and Configure an Oracle CQL Processor for Parallel Query Execution.

Examples

Here are a few examples of a simple EPN:

Example 5-1 EPN Example 1

An event is defined as the relation P of any pair (PN, PV), representing property names and property values. For the purpose of this document, there is no need to define the domain of property names and property values.

EPN1 = ({adapter1, channel1, processor, channel2, adapter2},
{(adapter1,channel1), (channel1, processor), (processor, channel2), (channel2, adapter2)})

Example 5-2 EPN Example 2

As an EPN node may contain more than one event, we define the set E as an ordered sequence of events.

Note:

E is ordered which is different from other cases.

The runtime state S = (N,E) of the EPN can thus be presented as a two-place relation from N to E. Note that the relation S is not injective, meaning that the same event(s) may be present in more than one node. However, it is surjective, as all events of the total set of events in the EPN must be in at least one node.

e1 = {(price, 10), (volume, 200), (symbol, 'ORCL')}
e2 = {(p1, v1), (p2, v2), (p3, v3)}