The Parallel For-Each scope enables you to process a collection of messages by splitting the collection into parts that are simultaneously processed in separate routes. After all messages are processed, the results are aggregated following the same order they were in before the split, and then the flow continues.
In the below tutorial we will see who we can use parallel for each in you project.
Download Parallel For Each Example
Syntax:
<parallel-foreach doc:name="parallel For Each" collection="payload"> <!-- Code to be Processed parallel --> </parallel-foreach>
Parallel For Each
In this example we will send a JSON message as an array, which will be split by parallel for each and executed in parallel.
Inside parallel for each we are transforming the message received with a delay of 5 sec, so that we can clearly see in logs in our API has processed messages in parallel or not.
<sub-flow name="addUsersParallelForEach" doc:id="5820e110-740b-48aa-baf2-b4f0fa68716a" > <logger level="INFO" doc:name="Log Request" doc:id="53992e7f-84cf-4c29-bb74-2be27a2ececf" message="'request received - ' #[payload]"/> <parallel-foreach doc:name="parallel For Each" doc:id="81acc47f-7b50-4806-95ea-6e7f24cd6683" collection="payload"> <ee:transform doc:name="Transform Message" doc:id="751032f3-11f6-4ce3-b136-73e534bd6224" > <ee:message > <ee:set-payload ><![CDATA[%dw 2.0 import * from dw::Runtime output application/json --- msg : payload.username ++ ' processed' wait 5000]]></ee:set-payload> </ee:message> <ee:variables > </ee:variables> </ee:transform> <logger level="INFO" doc:name="for-each output" doc:id="bfe598c4-6b02-4d36-8fa9-00d9cb2a8cce" message="for-each output: #[payload]"/> </parallel-foreach> <set-payload value="#[%dw 2.0 output application/json --- payload]" doc:name="Set Payload" doc:id="989fedef-40e2-4e74-87e6-577bebca3b4c" /> <logger level="INFO" doc:name="Logger" doc:id="fa4eb2fb-a70c-4489-aaff-81eafa03213f" message="#[payload]"/> </sub-flow>
Request:
Output:
Logs: In the logs we can see that the messages are processed in parallel.
Parallel Processing in Batches
In this example we will execute parallel processing but in batches. If we are connecting to an external system (suppose salesforce), and need to send the request in batches of 200 and all the batches should be executed in parallel.
How can we achieve this? is simple, by using divideBy function.
<parallel-foreach doc:name="parallel For Each" doc:id="3641e6b1-e499-4528-b6f6-d9ad7545368e" collection="#[import * from dw::core::Arrays output application/json --- payload divideBy 2]">
Here for example, if we receive 10 records. 10 records will be spit/divided into sets of 2 and 5 jobs will be created that will executed in parallel and processed.
In the below code we are dividing the payload received into set of 2, then transforming the message received with a delay of 5 sec so that we can clearly see in API logs if messages processed in parallel or not.
<sub-flow name="addUsersBatchParallelForEach" doc:id="aedbbefb-d38f-4ee1-a7e3-dc537645da5e" > <logger level="INFO" doc:name="Log Request" doc:id="1e9de3c9-cce7-4744-9010-c3b9b2a100ab" message="'request received - ' #[payload]"/> <parallel-foreach doc:name="parallel For Each" doc:id="3641e6b1-e499-4528-b6f6-d9ad7545368e" collection="#[import * from dw::core::Arrays output application/json --- payload divideBy 2]"> <flow-ref doc:name="Flow Reference" doc:id="2bf73bb0-1916-47fd-967d-4cdde18428f3" name="addUsersSub_Flow_BatchParallelForEach"/> </parallel-foreach> <set-payload value="#[%dw 2.0 output application/json --- flatten (payload.payload)]" doc:name="Set Payload" doc:id="bbee3431-7975-4528-93cf-3955ee4011cc" /> <logger level="INFO" doc:name="Logger" doc:id="fffae7bf-573d-4b27-9a69-26ecedde5d78" message="#[payload]"/> </sub-flow> <sub-flow name="addUsersSub_Flow_BatchParallelForEach" doc:id="eb15de26-5035-47ab-8183-4e6efbe49b80"> <ee:transform doc:name="Transform Message" doc:id="eaaebb0c-539a-4269-8295-2701b5c6397a" > <ee:message > <ee:set-payload ><![CDATA[%dw 2.0 import * from dw::Runtime output application/json --- (payload map { msg : $.username ++ ' processed' }) wait 5000 ]]></ee:set-payload> </ee:message> <ee:variables > </ee:variables> </ee:transform> <logger level="INFO" doc:name="for-each output" doc:id="198b6135-10e6-4882-bd1d-1686dd3f49fd" message="for-each output: #[payload]"/> </sub-flow>
Output:
To get only the message payload received after processing we are using flatten (payload.payload)
Logs: