Mule 4 Mulesoft Basics Mulesoft Tutorial

Parallel For Each in Mule 4

The Parallel For-Each scope enables you to process a collection of messages by splitting the collection into parts that are simultaneously processed in separate routes. After all messages are processed, the results are aggregated following the same order they were in before the split, and then the flow continues.

In the below tutorial we will see who we can use parallel for each in you project.

Download Parallel For Each Example

Syntax:

<parallel-foreach doc:name="parallel For Each" collection="payload">
<!-- Code to be Processed parallel -->
</parallel-foreach>

Parallel For Each

In this example we will send a JSON message as an array, which will be split by parallel for each and executed in parallel.
Inside parallel for each we are transforming the message received with a delay of 5 sec, so that we can clearly see in logs in our API has processed messages in parallel or not.

<sub-flow name="addUsersParallelForEach" doc:id="5820e110-740b-48aa-baf2-b4f0fa68716a" >
  <logger level="INFO" doc:name="Log Request" doc:id="53992e7f-84cf-4c29-bb74-2be27a2ececf" message="'request received - ' #[payload]"/>
    <parallel-foreach doc:name="parallel For Each" doc:id="81acc47f-7b50-4806-95ea-6e7f24cd6683" collection="payload">
      <ee:transform doc:name="Transform Message" doc:id="751032f3-11f6-4ce3-b136-73e534bd6224" >
        <ee:message >
          <ee:set-payload ><![CDATA[%dw 2.0
import * from dw::Runtime
output application/json
---
msg : payload.username ++ ' processed' wait 5000]]></ee:set-payload>
        </ee:message>
        <ee:variables >
        </ee:variables>
      </ee:transform>
      <logger level="INFO" doc:name="for-each output" doc:id="bfe598c4-6b02-4d36-8fa9-00d9cb2a8cce" message="for-each output:  #[payload]"/>
    </parallel-foreach>
    <set-payload value="#[%dw 2.0
output application/json
---
payload]" doc:name="Set Payload" doc:id="989fedef-40e2-4e74-87e6-577bebca3b4c" />
    <logger level="INFO" doc:name="Logger" doc:id="fa4eb2fb-a70c-4489-aaff-81eafa03213f" message="#[payload]"/>
  
</sub-flow>

Request:

Output:

Logs: In the logs we can see that the messages are processed in parallel.

Parallel Processing in Batches

In this example we will execute parallel processing but in batches. If we are connecting to an external system (suppose salesforce), and need to send the request in batches of 200 and all the batches should be executed in parallel.
How can we achieve this? is simple, by using divideBy function.

<parallel-foreach doc:name="parallel For Each" doc:id="3641e6b1-e499-4528-b6f6-d9ad7545368e" collection="#[import * from dw::core::Arrays output application/json --- payload divideBy 2]">

Here for example, if we receive 10 records. 10 records will be spit/divided into sets of 2 and 5 jobs will be created that will executed in parallel and processed.

In the below code we are dividing the payload received into set of 2, then transforming the message received with a delay of 5 sec so that we can clearly see in API logs if messages processed in parallel or not.

<sub-flow name="addUsersBatchParallelForEach" doc:id="aedbbefb-d38f-4ee1-a7e3-dc537645da5e" >
  <logger level="INFO" doc:name="Log Request" doc:id="1e9de3c9-cce7-4744-9010-c3b9b2a100ab" message="'request received - ' #[payload]"/>
    <parallel-foreach doc:name="parallel For Each" doc:id="3641e6b1-e499-4528-b6f6-d9ad7545368e" collection="#[import * from dw::core::Arrays output application/json --- payload divideBy 2]">
      <flow-ref doc:name="Flow Reference" doc:id="2bf73bb0-1916-47fd-967d-4cdde18428f3" name="addUsersSub_Flow_BatchParallelForEach"/>
    </parallel-foreach>
    
    <set-payload value="#[%dw 2.0
output application/json
---
flatten (payload.payload)]" doc:name="Set Payload" doc:id="bbee3431-7975-4528-93cf-3955ee4011cc" />
    <logger level="INFO" doc:name="Logger" doc:id="fffae7bf-573d-4b27-9a69-26ecedde5d78" message="#[payload]"/>
  
</sub-flow>
  <sub-flow name="addUsersSub_Flow_BatchParallelForEach" doc:id="eb15de26-5035-47ab-8183-4e6efbe49b80">
  <ee:transform doc:name="Transform Message" doc:id="eaaebb0c-539a-4269-8295-2701b5c6397a" >
        <ee:message >
          <ee:set-payload ><![CDATA[%dw 2.0
import * from dw::Runtime
output application/json
---
(payload map {
  msg : $.username ++ ' processed' 
}) wait 5000 
]]></ee:set-payload>
        </ee:message>
        <ee:variables >
        </ee:variables>
      </ee:transform>
      <logger level="INFO" doc:name="for-each output" doc:id="198b6135-10e6-4882-bd1d-1686dd3f49fd" message="for-each output:  #[payload]"/>
  </sub-flow>

Output:
To get only the message payload received after processing we are using flatten (payload.payload)

Logs:

Varun Goel

About Varun Goel

Varun Goel is a technology enthusiast with 6+ years exp in IT industry. In fact, he is been developing application after schooling as freelancer. Currently working with one of the Fortune’s 100 Companies having vast experience Mule ESB, Tibco, HTML5, CSS, JSS, Android, Core Java, JSP, PHP, MySQL, AutoCAD, Maya, ZBrush, Photoshop, Flash CS and many more.

Leave a Reply

Your email address will not be published.