Tutorials A to Z » January 11, 2020

Daily Archives: January 11, 2020

Mule 4 Mulesoft Basics Mulesoft Tutorial

Parallel For Each in Mule 4

Published by:

The Parallel For-Each scope enables you to process a collection of messages by splitting the collection into parts that are simultaneously processed in separate routes. After all messages are processed, the results are aggregated following the same order they were in before the split, and then the flow continues.

In the below tutorial we will see who we can use parallel for each in you project.

Download Parallel For Each Example

Syntax:

<parallel-foreach doc:name="parallel For Each" collection="payload">
<!-- Code to be Processed parallel -->
</parallel-foreach>

Parallel For Each

In this example we will send a JSON message as an array, which will be split by parallel for each and executed in parallel.
Inside parallel for each we are transforming the message received with a delay of 5 sec, so that we can clearly see in logs in our API has processed messages in parallel or not.

<sub-flow name="addUsersParallelForEach" doc:id="5820e110-740b-48aa-baf2-b4f0fa68716a" >
  <logger level="INFO" doc:name="Log Request" doc:id="53992e7f-84cf-4c29-bb74-2be27a2ececf" message="'request received - ' #[payload]"/>
    <parallel-foreach doc:name="parallel For Each" doc:id="81acc47f-7b50-4806-95ea-6e7f24cd6683" collection="payload">
      <ee:transform doc:name="Transform Message" doc:id="751032f3-11f6-4ce3-b136-73e534bd6224" >
        <ee:message >
          <ee:set-payload ><![CDATA[%dw 2.0
import * from dw::Runtime
output application/json
---
msg : payload.username ++ ' processed' wait 5000]]></ee:set-payload>
        </ee:message>
        <ee:variables >
        </ee:variables>
      </ee:transform>
      <logger level="INFO" doc:name="for-each output" doc:id="bfe598c4-6b02-4d36-8fa9-00d9cb2a8cce" message="for-each output:  #[payload]"/>
    </parallel-foreach>
    <set-payload value="#[%dw 2.0
output application/json
---
payload]" doc:name="Set Payload" doc:id="989fedef-40e2-4e74-87e6-577bebca3b4c" />
    <logger level="INFO" doc:name="Logger" doc:id="fa4eb2fb-a70c-4489-aaff-81eafa03213f" message="#[payload]"/>
  
</sub-flow>

Request:

Output:

Logs: In the logs we can see that the messages are processed in parallel.

Parallel Processing in Batches

In this example we will execute parallel processing but in batches. If we are connecting to an external system (suppose salesforce), and need to send the request in batches of 200 and all the batches should be executed in parallel.
How can we achieve this? is simple, by using divideBy function.

<parallel-foreach doc:name="parallel For Each" doc:id="3641e6b1-e499-4528-b6f6-d9ad7545368e" collection="#[import * from dw::core::Arrays output application/json --- payload divideBy 2]">

Here for example, if we receive 10 records. 10 records will be spit/divided into sets of 2 and 5 jobs will be created that will executed in parallel and processed.

In the below code we are dividing the payload received into set of 2, then transforming the message received with a delay of 5 sec so that we can clearly see in API logs if messages processed in parallel or not.

<sub-flow name="addUsersBatchParallelForEach" doc:id="aedbbefb-d38f-4ee1-a7e3-dc537645da5e" >
  <logger level="INFO" doc:name="Log Request" doc:id="1e9de3c9-cce7-4744-9010-c3b9b2a100ab" message="'request received - ' #[payload]"/>
    <parallel-foreach doc:name="parallel For Each" doc:id="3641e6b1-e499-4528-b6f6-d9ad7545368e" collection="#[import * from dw::core::Arrays output application/json --- payload divideBy 2]">
      <flow-ref doc:name="Flow Reference" doc:id="2bf73bb0-1916-47fd-967d-4cdde18428f3" name="addUsersSub_Flow_BatchParallelForEach"/>
    </parallel-foreach>
    
    <set-payload value="#[%dw 2.0
output application/json
---
flatten (payload.payload)]" doc:name="Set Payload" doc:id="bbee3431-7975-4528-93cf-3955ee4011cc" />
    <logger level="INFO" doc:name="Logger" doc:id="fffae7bf-573d-4b27-9a69-26ecedde5d78" message="#[payload]"/>
  
</sub-flow>
  <sub-flow name="addUsersSub_Flow_BatchParallelForEach" doc:id="eb15de26-5035-47ab-8183-4e6efbe49b80">
  <ee:transform doc:name="Transform Message" doc:id="eaaebb0c-539a-4269-8295-2701b5c6397a" >
        <ee:message >
          <ee:set-payload ><![CDATA[%dw 2.0
import * from dw::Runtime
output application/json
---
(payload map {
  msg : $.username ++ ' processed' 
}) wait 5000 
]]></ee:set-payload>
        </ee:message>
        <ee:variables >
        </ee:variables>
      </ee:transform>
      <logger level="INFO" doc:name="for-each output" doc:id="198b6135-10e6-4882-bd1d-1686dd3f49fd" message="for-each output:  #[payload]"/>
  </sub-flow>

Output:
To get only the message payload received after processing we are using flatten (payload.payload)

Logs:

Mule 4 Mulesoft Basics Mulesoft Tutorial

Execution Dataweave Dynamically

Published by:

In case we want our Dataweave expression outside mule project, load and process it at runtime then you would need Dynamic Evaluate component.

Download Dynamic Evaluate Project Example

In a scenario wherein dataweave mapping conditions are expected to change frequently based on client’s requirements and you don’t want to redeploy running APIs again and again, in such scenario we can store our dataweave expression in a DB or S3 or other location and access and process it dynamically in our Mule API. Any changes made to this external datawave will be picked up by Mule while reading it from external source and processed.

In the below example we are using variable dynamic_dw to store the datawave expression as a String. In a real world this datawave expression should be coming from an external source like DB or SFTP or others and getting stored in a variable.

Request:

Response:

Code:

<sub-flow name="dynamic-evaluateSub_Flow" doc:id="2145c3c0-5196-418f-9dd3-adf06966cc4a" >
  <set-variable value="#[%dw 2.0 
output application/json 
---
payload]" doc:name="Store payload" doc:id="18b55a69-28fd-48ae-9344-f80e9be3ffc6" variableName="reqReceived"/>
  <set-variable value="#['%dw 2.0 output application/json --- vars.reqReceived.username']" doc:name="datawave received from external source" doc:id="29aebfba-73e4-41b3-9f3f-e508f98da413" variableName="dynamic_dw"/>
  <logger level="INFO" doc:name="datawave received" doc:id="672895f7-42e8-4c25-8dca-b11bd61634b3" message="#['script - ' ++ vars.dynamic_dw]"/>
  <ee:dynamic-evaluate doc:name="Dynamic Evaluate" doc:id="59d877bb-36ba-4193-bf72-df5083a06d22" expression="#[vars.dynamic_dw]"/>
  <logger level="INFO" doc:name="output" doc:id="fea7a7e5-b8d3-4726-80b8-a846c3794a71" message="#[payload]"/>
</sub-flow>