MuleSoft Scatter-Gather Scope
In this tutorial we will look at various configuration properties of Scatter-Gather with examples in detail and also see how to handle exception in Scatter-Gather. Why use Scatter-Gather in Mulesoft: Please read Validation Framework to understand how error is generated in the example. |
Running Scatter Gather with default configuration:
First we will look at a simple example of Scatter Gather and its configuration properties. We have created a simple flow with scatter gather that send same message to 3 different routes for processing. We have used groovy script in “Groovy” component to sleep the thread processing the current route for few sec, so to see clearly how multiple routes are getting executed. Flow Design: <flow name="post:/users:application/json:validationFrameWork-config"> <logger message="Flow Started" level="INFO" doc:name="Logger"/> <object-to-byte-array-transformer doc:name="Object to Byte Array"/> <scatter-gather doc:name="Scatter-Gather"> <processor-chain> <logger level="INFO" message="Route 1 started" doc:name="Logger"/> <scripting:component doc:name="Groovy"> <scripting:script engine="Groovy"><![CDATA[sleep(1000); return message.payload;]]></scripting:script> </scripting:component> <logger level="INFO" message="Route 1 completed" doc:name="Logger"/> </processor-chain> <processor-chain> <logger level="INFO" message="Route 2 started" doc:name="Logger"/> <scripting:component doc:name="Groovy"> <scripting:script engine="Groovy"><![CDATA[sleep(1000); return message.payload;]]></scripting:script> </scripting:component> <logger level="INFO" message="Route 2 completed" doc:name="Logger"/> </processor-chain> <processor-chain> <logger message="Route 3 started" level="INFO" doc:name="Logger"/> <scripting:component doc:name="Groovy"> <scripting:script engine="Groovy"><![CDATA[sleep(1000); return message.payload;]]></scripting:script> </scripting:component> <logger level="INFO" message="Route 3 completed" doc:name="Logger"/> </processor-chain> </scatter-gather> <set-payload value="#[message.payloadAs(java.lang.String)]" mimeType="application/json" doc:name="Set Payload"/> </flow>
On running the application with default configuration of scatter gather we can see below output: Output Log: Note: From the output we can see, when scatter gather is configured with default configuration it executes all the routes parallel by creating number of threads equal to number of routes in scatter-gather. Download Mulesoft Scatter Gather Project |
Scatter-Gather Configuration
Timeout: To sets the timeout for responses from sent messages, in milliseconds. If the route been executed doesnot respond within the specified time, the route will timeout and throw an exception. A value of 0 or lower than 0 means no timeout. Threading Profile: Optionally, use to customize the threading profile. Do Threading: To set if Threading Profile is to be applied on all the request send to the application. maxThreadsActive: The maximum number of threads to use. If set to 1, scatter-gather will execute the routes sequentially. maxThreadsIdle: The maximum number of idle or inactive threads that can be in the pool before they are destroyed. threadTTL: Determines how long an inactive thread is kept in the pool before being discarded. poolExhaustedAction: When the maximum pool size or queue size is bounded, this value determines how to handle incoming tasks. Possible values are: threadWaitTimeout: How long to wait in milliseconds when the pool exhausted action is WAIT. If the value is negative, it waits indefinitely. maxBufferSize: Determines how many requests are queued when the pool is at maximum usage capacity and the pool exhausted action is WAIT. The buffer is used as a kind of throttling for thread creation, before requests are processed. |
Setting Timeout in Scatter-Gather
For this example we will increase sleep time in first route to 3 sec. And Configure Timeout value in Scatter-Gather to 2000 ( 2 sec). In the output we will see that route 1 has timed out as route 1 was taking more time to complete the execution than what was specified in Scatter-Gather configuration. Output Log: |
Setting Max Active Thread
In the below example we will set Max Active thread to different values and see how it works. Setting max Active Thread to 1 and set Pool Exhausted Action to WAIT. On running the application we can see the output for the logger is sequential, this is because max Active thread will allow only 1 thread to run routes in scatter-gather thus route1 will execute first then route 2 and then route 3. Also Pool Exhausted Action set to WAIT will make thread wait until current route in execution is complete. Note: This is how we can achieve sequential processing in scatter gather component. Setting max Active Thread to 2 and set Pool Exhausted Action to WAIT. On setting max Active Thread to 2 in scatter-gather, mule will create 2 max thread to execute all the routes configured, thus Setting max Active Thread to 3 and set Pool Exhausted Action to WAIT. |
Exception Handling for Scatter-Gather.
Please read Validation Framework to understand how error is generated in the example. • sets the exception payload accordingly for each route • throws a CompositeRoutingException, which maps each exception to its corresponding route using a sequential route ID Catching the CompositeRoutingException allows you to gather information on all failed routes. In the below example; for 2 routes inside Scatter-Gather we have used “Expression” component that will throw an error with error message “Error from Route 1/2”. We will use Choice Exception Strategy to handle the error and send the error message.
<flow name="post:/users:application/json:scatterGather-config"> <flow name="post:/users:application/json:scatterGather-config"> <logger message="Flow Started" level="INFO" doc:name="Logger"/> <object-to-byte-array-transformer doc:name="Object to Byte Array"/> <scatter-gather doc:name="Scatter-Gather"> <threading-profile maxThreadsActive="2" poolExhaustedAction="WAIT"/> <processor-chain> <logger level="INFO" message="Route 1 started" doc:name="Logger"/> <expression-component doc:name="Expression"><![CDATA[generateValidationException("Error From Route 1");]]></expression-component> <scripting:component doc:name="Groovy"> <scripting:script engine="Groovy"><![CDATA[sleep(1000); return message.payload;]]></scripting:script> </scripting:component> <logger level="INFO" message="Route 1 completed" doc:name="Logger"/> </processor-chain> <processor-chain> <logger level="INFO" message="Route 2 started" doc:name="Logger"/> <expression-component doc:name="Expression"><![CDATA[generateValidationException("Error From Route 2");]]></expression-component> <scripting:component doc:name="Groovy"> <scripting:script engine="Groovy"><![CDATA[sleep(1000); return message.payload;]]></scripting:script> </scripting:component> <logger level="INFO" message="Route 2 completed" doc:name="Logger"/> </processor-chain> <processor-chain> <logger message="Route 3 started" level="INFO" doc:name="Logger"/> <scripting:component doc:name="Groovy"> <scripting:script engine="Groovy"><![CDATA[sleep(1000); return message.payload;]]></scripting:script> </scripting:component> <logger level="INFO" message="Route 3 completed" doc:name="Logger"/> </processor-chain> </scatter-gather> <set-payload value="#[message.payloadAs(java.lang.String)]" mimeType="application/json" doc:name="Set Payload"/> <exception-strategy ref="Choice_Exception_Strategy" doc:name="Reference Exception Strategy"/> </flow> The exception generated is of class CompositeRoutingException which extends the Mule MessagingException to aggregate exceptions from different routes in the context of a single message router. Exceptions are correlated to each route through a sequential ID. Exception Block: We have used dataweave to loop through the exceptions and get the error from each route that has failed. <choice-exception-strategy name="Choice_Exception_Strategy"> <catch-exception-strategy when="#[exception.causeMatches('*ValidationException*')]" doc:name="Validation Exception Strategy"> <set-variable variableName="exceptionMessage" value="#[groovy:message.getExceptionPayload().getRootException().getMessage()]" doc:name="Set exceptionMessage"/> <set-property propertyName="http.status" value="500" doc:name="Property"/> <dw:transform-message doc:name="Transform Message"> <dw:set-payload><![CDATA[%dw 1.0 %output application/json --- { errorMessage: flowVars.exceptionMessage }]]></dw:set-payload> </dw:transform-message> </catch-exception-strategy> <catch-exception-strategy doc:name="CompositeRoutingException" when="#[exception.causeMatches('*CompositeRoutingException*')]"> <set-payload value="#[exception]" doc:name="Set Payload"/> <dw:transform-message doc:name="Transform Message"> <dw:set-payload><![CDATA[%dw 1.0 %output application/json --- { errors: payload.exceptions map ((error, position) -> position ++ ":" ++ error.causeException.message ) }]]></dw:set-payload> </dw:transform-message> </catch-exception-strategy> <catch-exception-strategy doc:name="Catch All Exception"> <set-payload value="#[groovy:message.getExceptionPayload().getRootException().getMessage()]" doc:name="Set Payload"/> </catch-exception-strategy> </choice-exception-strategy>
Note: We can also use |
Hey Varun, Looking forward for more posts on Mule. Please dont let the thread die. Thanks for your efforts
Thanks Avinash, sure will be add more post soon 🙂
Varun Goel, thank you for this post.
Welcome Prajeet