10 Using Parallel Flow in a BPEL Process
This chapter includes the following sections:
10.1 Introduction to Parallel Flows in BPEL Processes
A BPEL process service component must sometimes gather information from multiple asynchronous sources. Because each callback can take an undefined amount of time (hours or days), it may take too long to call each service one at a time. By breaking the calls into a parallel flow, a BPEL process service component can invoke multiple web services at the same time, and receive the responses as they come in. This method is much more time efficient.
Figure 10-1 shows a flow activity named Retrieve_QuotesFromSuppliers. The Retrieve_QuotesFromSuppliers flow activity sends order information to two suppliers in parallel:
- 
                        An internal warehouse (InternalWarehouseService) 
- 
                        An external partner warehouse (PartnerSupplierMediator) 
The two warehouses return their bids for the order to the flow activity. Here, two asynchronous callbacks execute in parallel. One callback does not have to wait for the other to complete first. Each response is stored in a different global variable.
10.1.1 What You May Need to Know About the Execution of Parallel Flow Branches in a Single Thread
Branches in flow, flowN, and forEach activities are executed serially in a single thread (that is, the Nth branch is executed only after N-1 execution has completed). Execution is not completely parallel. This is because the branches do not execute in concurrent threads in this mode. Instead, one thread starts executing a flow branch until it reaches a blocking activity (for example, an synchronous invoke). At this point, a new thread is created that starts executing the other branch, and the process continues. This creates the impression that the flow branches are executing in parallel. In this mode, however, if the flow branches do not define a blocking activity, the branches still execute serially.
This design is intended for several reasons:
- 
                           To prevent you from accidentally spawning too many threads and overloading the system, single threading is the default method. However, you can tune threads in other places, such as adapter polling threads, BPEL process service engine threads, and Oracle WebLogic Server work managers. 
- 
                           The BPEL process specification does not provide a mechanism to ensure the thread safety of BPEL variables (that is, a lack of a synchronized qualifier such as in Java), which is necessary for true multithreaded programming. 
- 
                           The implication of transaction rollbacks in one of the branches is undefined. 
To achieve pseudo-parallelism, you can configure invoke activities to be nonblocking with the nonBlockingInvoke deployment descriptor property. When this property is set to true, the process manager creates a new thread to perform each branch's invoke activity in parallel.
                     
For more information about the nonBlockingInvoke property, see How to Define Deployment Descriptor Properties in the Property Inspector.
                     
10.2 Creating a Parallel Flow
You can create a parallel flow in a BPEL process service component with the flow activity. The flow activity enables you to specify one or more activities to be performed concurrently. The flow activity also provides synchronization. The flow activity completes when all activities in the flow have finished processing. Completion of this activity includes the possibility that it can be skipped if its enabling condition is false.
Note:
Branches in a flow activity are executed serially in a single thread. For more information, see What You May Need to Know About the Execution of Parallel Flow Branches in a Single Thread.
10.2.2 What Happens When You Create a Parallel Flow
A flow activity typically contains many sequence activities. Each sequence is performed in parallel. The following example shows the syntax for two sequences of the Retrieve_QuotesFromSuppliers flow activity in the OrderProcessor.bpel file after design completion. However, a flow activity can have many sequences. A flow activity can also contain other activities. In the following example, each sequence in the flow contains assign, invoke, and receive activities.
                     
<flow name="Retrieve_QuotesFromSuppliers"> <sequence name="Sequence_4"> <assign name="Assign_InternalWarehouseRequest"> <copy> <from>$inputVariable.gOrderInfoVariable/ns3:CardNum</from> <to>lInternalWarehouseInputVariable/ns4:ccnb</to> </copy> </assign> <invoke name="Invoke_InternalWarehouse" inputVariable="lInternalWarehouseInputVariable" partnerLink="InternalWarehouseService" portType="ns1:InternalWarehouseService" operation="process"/> <receive name="Receive_InternalWarehouse" createInstance="no" variable="lInternalWarehouseResponseVariable" partnerLink="InternalWarehouseService" portType="ns1:InternalWarehouseServiceCallback" operation="processResponse"/> <assign name="Assign_InterWHResponse"> <bpelx:append> <bpelx:from variable="lInternalWarehouseResponseVariable" part="payload" query="/ns1:WarehouseResponse"/> <bpelx:to variable="gWarehouseQuotes" query="/ns1:WarehouseList"/> </bpelx:append> </assign> </sequence> <sequence name="Sequence_4"> <assign name="Assign_PartnerRequest"> <copy> <from variable="gOrderInfoVariable" query="/ns4:orderInfoVOSDO"/> <to variable="lPartnerSupplierInputVariable" part="request" query="/ns4:orderInfoVOSDO"/> </copy> </assign> <invoke name="Invoke_PartnerSupplier" partnerLink="PartnerSupplierMediator" portType="ns15:execute_ptt" operation="execute" inputVariable="lPartnerSupplierInputVariable"/> <receive name="Receive_PartnerResponse" createInstance="no" variable="lPartnerResponseVariable" partnerLink="PartnerSupplierMediator" portType="ns15:callback_ptt" operation="callback"/> <assign name="Assign_PartnerWHResponse"> <bpelx:append> <bpelx:from variable="lPartnerResponseVariable" part="callback" query="/ns1:WarehouseResponse"/> <bpelx:to variable="gWarehouseQuotes" query="/ns1:WarehouseList"/> </bpelx:append> </assign> </sequence> </flow>
10.2.3 Synchronizing the Execution of Activities in a Flow Activity
You can synchronize the execution of activities within a flow activity to ensure that certain activities only execute after other activities have completed. For example, assume you have an invoke activity, verifyFlight, that is executed in parallel with other invoke activities (verifyHotel, verifyCarRental, and scheduleFlight) when the flow activity begins. However, scheduling a flight is necessary only after verifying that a flight is available. Therefore, you can add a link between the verifyFlight and scheduleFlight invoke activities. Links provide a level of dependency indicating that the activity that is the target of the link (scheduleFlight) is only executed if the activity that is the source of the link (verifyFlight) has completed. 
                     
The following example provides details. The link name verifyFlight-To-scheduleFlight is assigned to the source verifyFlight and target scheduleFlight invoke activities. If the source verifyFlight completes execution, the target scheduleFlight is then executed.
                     
<flow ...>
   <links>
      <link name="verifyFlight-To-scheduleFlight" />
   </links>
   <documentation>
      Verify the availability of a flight, hotel, and rental car in parallel
   </documentation>
   <invoke name="verifyFlight" ...>
      <sources>
         <source linkName="verifyFlight-To-scheduleFlight" />
      </sources>
   </invoke>
   <invoke name="verifyHotel" ... />
   <invoke name="verifyCarRental" ... />
   <invoke name="scheduleFlight" ...>
      <targets>
         <target linkName="verifyFlight-To-scheduleFlight" />
      </targets>
   </invoke>
</flow>
The preceding code provides an example of link syntax in BPEL version 2.0. The link syntax between BPEL version 1.1 and BPEL version 2.0 is slightly different.
- 
                           BPEL version 1.1 uses <target>and<source>.
- 
                           BPEL version 2.0 uses <targets>and<sources>.
Table 10-1 provides details.
Table 10-1 Links Syntax in BPEL Version 1.1 and BPEL Version 2.0
| BPEL Version 1.1 Example | BPEL Version 2.0 Example | 
|---|---|
| <flow>
   <links>
     <link name="XtoY"/>
     <link name="CtoD"/>
   </links>
   <sequence name="X">
      <source linkName="XtoY"/>
      <invoke name="A" .../>
      <invoke name="B" .../>
   </sequence>
   <sequence name"Y">
      <target linkName="XtoY"/>
      <receive name="C" ...>
         <source linkName="CtoD"/>
      </receive>
      <invoke name="E" .../>
   </sequence>
   <invoke partnerLink="D" ...>
     <target linkName="CtoD"/>
   </invoke>
 </flow> | <flow>
      <links>
        <link name="AtoB"/>
      </links>
      <assign name="B">
        <targets>
          <target linkName="AtoB"/>
        </targets>
        <copy>
          <from>concat($output.payload,
            'B')</from>
          <to>$output.payload</to>
        </copy>
      </assign>
      <assign name="A">
        <sources>
          <source linkName="AtoB"/>
        </sources>
        <copy>
          <from>concat($output.payload,
           'A')</from>
          <to>$output.payload</to>
        </copy>
      </assign>
    </flow> | 
10.2.4 How to Create Synchronization Between Activities Within a Flow Activity
To create synchronization between activities within a flow activity:
Note:
The Sources and Targets tabs are only available in BPEL 2.0 projects. For BPEL 1.1 projects, you must directly edit the BPEL file to use this functionality.
10.2.5 What Happens When You Create Synchronization Between Activities Within a Flow Activity
The following example shows the .bpel file after design is complete for three flow activities with links for synchronizing activity execution.
                     
- 
                           Flow_1shows a link between simple activities.Flow_1includes a link namedAtoB. The activity that is the target of the link, assign activityB, is only executed if the activity that is the source of the link, assign activityA, has completed.
- 
                           Flow_2shows a link between simple activity and composite activity.Flow_2also includes the link namedAtoB. The activity that is the target of the link, assign activityB, is only executed if the activity that is the source of the link, scope activityscope1, has completed.
- 
                           Flow_3shows a link between composite activities.Flow_3also includes the link namedAtoB. The activity that is the target of the link, sequence activitySequence_1, is only executed if the activity that is the source of the link, scope activityscope2, has completed.
<!-- link between simple activities --> <flow name=Flow_1> <links> <link name="AtoB"/> </links> <assign name="A"> <sources> <source linkName="AtoB"/> </sources> <copy> <from>concat($output.payload, 'A')</from> <to>$output.payload</to> </copy> </assign> <assign name="B"> <targets> <target linkName="AtoB"/> </targets> <copy> <from>concat($output.payload, 'B')</from> <to>$output.payload</to> </copy> </assign> </flow> <!-- link between simple activity and composite activity --> <flow name=Flow_2> <links> <link name="AtoB"/> </links> <scope name="scope1"> <sources> <source linkName="AtoB"/> </sources> <assign name="A"> <copy> <from>concat($output.payload, 'A')</from> <to>$output.payload</to> </copy> </assign> </scope> <assign name="B"> <targets> <target linkName="AtoB"/> </targets> <copy> <from>concat($output.payload, 'B')</from> <to>$output.payload</to> </copy> </assign> </flow> <!-- link between composite activities --> <flow name=Flow_3> <links> <link name="AtoB"/> </links> <scope name="scope2"> <sources> <source linkName="AtoB"/> </sources> <assign name="A"> <copy> <from>concat($output.payload, 'A')</from> <to>$output.payload</to> </copy> </assign> </scope> <sequence name="Sequence_1> <targets> <target linkName="AtoB"/> </targets> <assign name="B"> <copy> <from>concat($output.payload, 'B')</from> <to>$output.payload</to> </copy> </assign> </sequence> </flow> </sequence>
10.2.6 What You May Need to Know About Join Conditions in Target Activities
You can specify an optional join condition in target activities. The value of the join condition is a boolean expression. If a join condition is not specified, the join condition is the disjunction (that is, a logical OR operation) of the link status of all incoming links of this activity.
Oracle BPEL Designer does not provide design support for adding join conditions. To add a join condition, you must manually add the condition to the .bpel file in Source view in Oracle BPEL Designer. 
                     
The following provides an example of a join condition.
<flow> <links> <link name="linkStatus2"/> </links> <empty name="E2"> <sources> <source linkName="linkStatus2"> <transitionCondition>false()</transitionCondition> </source> </sources> </empty> <empty name="E2"> <targets> <joinCondition>bpws:getLinkStatus('linkStatus2')=true()</joinCondition> <target linkName="linkStatus2"/> </targets> </empty> </flow>
10.3 Customizing the Number of Parallel Branches
This section describes how to customize the number of parallel branches with the following activities:
- 
                        A forEach activity in a BPEL version 2.0 project 
- 
                        A flowN activity in a BPEL version 1.1 project 
Note:
Branches in flowN and forEach activities are executed serially in a single thread. For more information, see What You May Need to Know About the Execution of Parallel Flow Branches in a Single Thread.
10.3.1 Processing Multiple Sets of Activities with the forEach Activity in BPEL 2.0
You can use a forEach activity to process multiple sets of activities sequentially or in parallel. The forEach activity executes a contained (child) scope activity exactly N+1 times, where N equals a final counter value minus a starting counter value that you specify in the Counter Values tab of the For Each dialog. While other structured activities such as a flow activity can have any type of activity as its contained activity, the forEach activity can only include a scope activity.
When the forEach activity is started, the expressions you specify for the starting counter and final counter values are evaluated. Once the two values are returned, they remain constant for the lifecycle of the activity. Both expressions must return a value containing at least one character. If these expressions do not return valid values, a fault is thrown. If the starting counter value is greater than the final counter value, the contained scope activity is not performed and the forEach activity is considered complete.
During each iteration, the variable specified in the Counter Name field on the General tab is implicitly declared in the forEach activity's contained scope. During the first iteration of the scope, the counter variable is initialized with the starting counter value. The next iteration causes the counter variable to be initialized with the starting counter value, plus one. Each subsequent iteration increments the previously initialized counter variable value by one until the final iteration, where the counter is set to the final counter value. The counter variable is local to the enclosed scope activity. Although its value can be changed during an iteration, that value is lost after each iteration. Therefore, the counter variable value does not impact the value of the next iteration's counter.
The forEach activity supports the following looping iterations:
- 
                           Sequential (default) The forEach activity performs looping iterations sequentially N times over a given set of activities defined within a scope activity. As an example, the forEach activity iterates over an incoming purchase order message where the purchase order message consists of N order items. The enclosed scope activity must be executed N+1 times, with each instance starting only after the previous iteration has completed. 
- 
                           Parallel All looping iterations are started at the same time and processed in parallel. Parallel iterations are useful in environments in which sets of independent data are processed or independent interaction with different partners is performed in parallel. To enable parallel looping, you select the Parallel Execution check box on the General tab. In these scenarios, execution of the N+1 instances of the contained scope activity occurs in parallel. Each copy of the scope activity has the same counter variable that you specify in the Counter Name field of the General tab declared in the same way as specified for a sequential forEach activity. Each instance's counter variable must be uniquely initialized in parallel with one of the integer values beginning with the starting counter value and proceeding up to and including the final counter value. Unlike a flow activity, the number of parallel branches is not known at design time with the forEach activity. The specified counter variable iterates through the number of parallel branches, controlled by the starting counter value and final counter value. 
You can also specify a completion condition on the Completion tab. This condition enables the forEach activity to execute the condition and complete without executing or finishing all the branches specified. As an example, you send out parallel requests and a sufficient subset of the recipients have responded. A completion condition is optionally specified to prevent the following:
- 
                           Some children from executing (in the sequential case) 
- 
                           To force early termination of some of the children (in the parallel case) 
If you do not specify a completion condition, the forEach activity completes when the contained scope has completed.
If a premature termination occurs (due to a fault or the completion condition evaluating to true), then the N+1 requirement does not apply. 
                     
The following example shows the forEach activity syntax.
<forEach counterName="MyVariableName" parallel="yes|no"
   standard-attributes>
   standard-elements
   <startCounterValue expressionLanguage="anyURI"?>
      unsigned-integer-expression
   </startCounterValue>
   <finalCounterValue expressionLanguage="anyURI"?>
      unsigned-integer-expression
   </finalCounterValue>
   <completionCondition>?
      <branches expressionLanguage="anyURI"?
         successfulBranchesOnly="yes|no"?>?
         unsigned-integer-expression
      </branches>
   </completionCondition>
   <scope ..>...</scope>
</forEach>Note:
The successfulBranchesOnly attribute is not supported for this release.
                        
10.3.1.2 What Happens When You Create a forEach Activity
The following example shows the .bpel file after design is complete for a sequential forEach activity.
                        
<faultHandlers>
    <catch faultName="bpel:invalidBranchCondition">
<sequence>
  <assign>
    <copy>
      <from>'invalidBranchCondition happened'</from>
      <to>$output.payload</to>
    </copy>
  </assign>
  <reply name="replyOutput" partnerLink="client"
      portType="tns:Test" operation="process" variable="output"/>
</sequence>
    </catch>
  </faultHandlers>
  <sequence>
    <!-- pick input from requester -->
    <receive name="receive" createInstance="yes"
             partnerLink="client" portType="tns:Test"
             operation="process" variable="input"/>
    <assign>
      <copy>
        <from>3</from>
        <to>$request.payload</to>
      </copy>
      <copy>
        <from>''</from>
        <to>$output.payload</to>
      </copy>
    </assign>
    <forEach counterName="i" parallel="no">
      <startCounterValue>$input.payload/tns:startCounter+1</startCounterValue>
      <finalCounterValue>$input.payload/tns:finalCounter+1</finalCounterValue>
      <completionCondition>
        <branches>$input.payload/tns:branches+1</branches>
      </completionCondition>
      <scope name="scope1">
        <partnerLinks>
          <partnerLink name="DummyService" partnerLinkType="tns:DummyService"
              myRole="DummyServiceClient" partnerRole="DummyServiceProvider"/>
        </partnerLinks>
        <sequence>
          <assign>
            <copy>
              <from>concat($output.payload, $i, 'A')</from>
              <to>$output.payload</to>
            </copy>
          </assign>
          <invoke name="invokeDummyService" partnerLink="DummyService"
              portType="tns:DummyPortType"
              operation="initiate" inputVariable="request"/>
          <receive name="receiveFromDummyService" partnerLink="DummyService"
              portType="tns:DummyCallbackPortType"
              operation="onResult" variable="response"/>          <assign>
            <copy>
              <from>concat($output.payload, $i, 'B')</from>
              <to>$output.payload</to>
            </copy>
          </assign>
        </sequence>
      </scope>
    </forEach>
    <!-- respond output to requester -->
    <reply name="replyOutput" partnerLink="client"
        portType="tns:Test" operation="process" variable="output"/>
  </sequence>
The following example shows the .bpel file after design is complete for a parallel forEach activity.
                        
<sequence>
    <!-- pick input from requester -->
    <receive name="receive" createInstance="yes"
             partnerLink="client" portType="tns:Test"
             operation="process" variable="input"/>
    <assign>
      <copy>
        <from>$input.payload/tns:value1</from>
        <to>$request.payload</to>
      </copy>
      <copy>
        <from>''</from>
        <to>$output.payload</to>
      </copy>
    </assign>
    <forEach counterName="i" parallel="yes">
      <startCounterValue>($input.payload/tns:value1 + 1)</startCounterValue>
      <finalCounterValue>($input.payload/tns:value2 + 2)</finalCounterValue>
      <scope name="scope1">
        <partnerLinks>
          <partnerLink name="DummyService" partnerLinkType="tns:DummyService"
              myRole="DummyServiceClient" partnerRole="DummyServiceProvider"/>
        </partnerLinks>
        <sequence>
          <assign>
            <copy>
              <from>concat($output.payload, 'A')</from>
              <to>$output.payload</to>
            </copy>
          </assign>
          <invoke name="invokeDummyService" partnerLink="DummyService"
              portType="tns:DummyPortType"
              operation="initiate" inputVariable="request"/>
          <receive name="receiveFromDummyService" partnerLink="DummyService"
              portType="tns:DummyCallbackPortType"
              operation="onResult" variable="response"/>
          <assign>
            <copy>
              <from>concat($output.payload, 'B')</from>
              <to>$output.payload</to>
            </copy>
          </assign>
        </sequence>
      </scope>
    </forEach>
    <!-- respond output to requester -->
    <reply name="replyOutput" partnerLink="client"
        portType="tns:Test" operation="process" variable="output"/>
  </sequence>10.3.2 Customizing the Number of Flow Activities with the flowN Activity in BPEL 1.1
In the flow activity, the BPEL code determines the number of parallel branches. However, often the number of branches required is different depending on the available information. The flowN activity creates multiple flows equal to the value of N, which is defined at runtime based on the data available and logic within the process. An index variable increments each time a new branch is created, until the index variable reaches the value of N.
                     
The flowN activity performs activities on an arbitrary number of data elements. As the number of elements changes, the BPEL process service component adjusts accordingly.
The branches created by flowN perform the same activities, but use different data. Each branch uses the index variable to look up input variables. The index variable can be used in the XPath expression to acquire the data specific for that branch.
For example, suppose there is an array of data. The BPEL process service component uses a count function to determine the number of elements in the array. The process then sets N to be the number of elements. The index variable starts at a preset value (zero is the default), and flowN creates branches to retrieve each element of the array and perform activities using data contained in that element. These branches are generated and performed in parallel, using all the values between the initial index value and N. The flowN activity terminates when the index variable reaches the value of N. For example, if the array contains 3 elements, N is set to 3. Assuming the index variable begins at 1, the flowN activity creates three parallel branches with indexes 1, 2, and 3.
                     
The flowN activity can use data from other sources as well, including data obtained from web services.
Figure 10-15 shows the runtime flow of a flowN activity in Oracle Enterprise Manager Fusion Middleware Control that looks up three hotels. This is different from the view, because instead of showing the BPEL process service component, it shows how the process has actually executed. In this case, there are three hotels, but the number of branches changes to match the number of hotels available.
Figure 10-15 Oracle Enterprise Manager Fusion Middleware Control View of the Execution of a flowN activity

Description of "Figure 10-15 Oracle Enterprise Manager Fusion Middleware Control View of the Execution of a flowN activity"
10.3.2.2 What Happens When You Create a FlowN Activity
The following code shows the .bpel file that uses the flowN activity to look up information on an arbitrary number of hotels.
                        
The following example shows the sequence name.
  <sequence name="main">
  <!-- Received input from requester. 
    Note: This maps to operation defined in NflowHotels.wsdl
    The requester sends a set of hotels names wrapped into the "inputVariable"
    -->
The following actions take place. A receive activity calls the client partner link to get the information that the flowN activity must define N times and look up the hotel information. The following provides an example:
                        
    <receive name="receiveInput" partnerLink="client"
 portType="client:NflowHotels" operation="initiate" variable="inputVariable"
 createInstance="yes"/>
    <!-- 
       The 'count()' Xpath function is used to get the number of hotelName
       noded passed in.
       An intermediate variable called "NbParallelFlow" is
       used to store the number of N flows being executed
       -->
    <assign name="getHotelsN">
      <copy>
        <from 
expression="count($InputVariable.payload/client:HotelName);"/>
        <to variable="NbParallelFlow"/>
      </copy>
    </assign>
    <!-- Initiating the FlowN activity
        The N value is initialized with the value stored in the
 "NbParallelFlow" variable
        The variable call "Index" is defined as the index variable
        NOTE: Both "NbParallelFlow" and "Index" variables have to be declared
        -->
The flowN activity begins next. After defining a name for the activity of flowN, N is defined as a value from the inputVariable, which is the number of hotel entries. The activity also assigns index as the index variable. The following provides an example:
                        
<bpelx:flowN name="FlowN" N="bpws:getVariableData('NbParallelFlow')
indexVariable="Index'>
      <sequence name="Sequence_1">
        <!-- Fetching each hotelName by indexing the "inputVariable" with the
 "Index" variable.
            Note the usage of the  "concat()" Xpath function to create the
 expression accessing the array element.
            -->
The copy rule shown in the following example then uses the index variable to concatenate the hotel entries into a list:
<assign name="setHotelId">
  <copy>
    <from expression=
"bpws:getVariableData('inputVariable','payload',concat('/client:Nflo
wHotelsProcessRequest/client:ListOfHotels/client:HotelName[',
bpws:getVariableData('Index'),']'))"/>
            <to variable="InvokeHotelDetailInputVariable" part="payload"
 query="/ns2:hotelInfoRequest/ns2:id"/>
          </copy>
        </assign>
Using the hotel information, an invoke activity looks up detailed information for each hotel through a web service. The following provides an example:
 <!-- For each hotel, invoke the web service giving detailed information
 on the hotel -->
        <invoke name="InvokeHotelDetail" partnerLink="getHotelDetail"
 portType="ns2:getHotelDetail" operation="process"
 inputVariable="InvokeHotelDetailInputVariable"
 outputVariable="InvokeHotelDetailOutputVariable"/>
      </sequence>
    </bpelx:flowN>
Finally, the BPEL process sends detailed information on each hotel to the client partner link. The following provides an example:
<invoke name="callbackClient" partnerLink="client" portType="client:NflowHotelsCallback" operation="onResult" inputVariable="outputVariable"/> </sequence> </sequence>















