Mule Tutorial Mulesoft Tutorial

Scatter-Gather In Depth – MuleSoft Tutorial


MuleSoft Scatter-Gather Scope


In this tutorial we will look at various configuration properties of Scatter-Gather with examples in detail and also see how to handle exception in Scatter-Gather.

Why use Scatter-Gather in Mulesoft:
To achieve parallel processing of multiple flows in mule we can use Scatter-Gather. The routing message processor Scatter-Gather sends a request message to multiple routes concurrently which are configured inside Scatter-Gather and collects the responses from all routes, and aggregates them into a single message. There will be multiple threads created for executing multiple routes simultaneously.
Scatter-Gather can also execute multiple routes sequentially.

Please read Validation Framework to understand how error is generated in the example.

Running Scatter Gather with default configuration:


First we will look at a simple example of Scatter Gather and its configuration properties.

We have created a simple flow with scatter gather that send same message to 3 different routes for processing. We have used groovy script in “Groovy” component to sleep the thread processing the current route for few sec, so to see clearly how multiple routes are getting executed.
We used “Object to Byte Array” transformer before Scatter-Gather, as the POST payload is an InputStream, which of course can’t be dispatched several times.

Flow Design:
basic-scatter-gather-flow

<flow name="post:/users:application/json:validationFrameWork-config">
  <logger message="Flow Started" level="INFO" doc:name="Logger"/>
  <object-to-byte-array-transformer doc:name="Object to Byte Array"/>
  <scatter-gather doc:name="Scatter-Gather">
    <processor-chain>
      <logger level="INFO" message="Route 1 started" doc:name="Logger"/>
      <scripting:component doc:name="Groovy">
        <scripting:script engine="Groovy"><![CDATA[sleep(1000);
return message.payload;]]></scripting:script>
      </scripting:component>
      <logger level="INFO" message="Route 1 completed" doc:name="Logger"/>
    </processor-chain>
    <processor-chain>
      <logger level="INFO" message="Route 2 started" doc:name="Logger"/>
      <scripting:component doc:name="Groovy">
        <scripting:script engine="Groovy"><![CDATA[sleep(1000);
return message.payload;]]></scripting:script>
      </scripting:component>
      <logger level="INFO" message="Route 2 completed" doc:name="Logger"/>
    </processor-chain>
    <processor-chain>
      <logger message="Route 3 started" level="INFO" doc:name="Logger"/>
      <scripting:component doc:name="Groovy">
        <scripting:script engine="Groovy"><![CDATA[sleep(1000);
return message.payload;]]></scripting:script>
      </scripting:component>
      <logger level="INFO" message="Route 3 completed" doc:name="Logger"/>
    </processor-chain>
  </scatter-gather>
  <set-payload value="#[message.payloadAs(java.lang.String)]" mimeType="application/json" doc:name="Set Payload"/>
</flow>

 

On running the application with default configuration of scatter gather we can see below output:

Output Log:
scatter-gather-basic-log

Note: From the output we can see, when scatter gather is configured with default configuration it executes all the routes parallel by creating number of threads equal to number of routes in scatter-gather.

Download Mulesoft Scatter Gather Project


Scatter-Gather Configuration



scatter-gather-config

Timeout: To sets the timeout for responses from sent messages, in milliseconds. If the route been executed doesnot respond within the specified time, the route will timeout and throw an exception. A value of 0 or lower than 0 means no timeout.

Threading Profile: Optionally, use to customize the threading profile.

Do Threading: To set if Threading Profile is to be applied on all the request send to the application.

maxThreadsActive: The maximum number of threads to use. If set to 1, scatter-gather will execute the routes sequentially.

maxThreadsIdle: The maximum number of idle or inactive threads that can be in the pool before they are destroyed.

threadTTL: Determines how long an inactive thread is kept in the pool before being discarded.

poolExhaustedAction: When the maximum pool size or queue size is bounded, this value determines how to handle incoming tasks. Possible values are:
WAIT – Wait until a thread becomes available; don’t use this value if the minimum number of threads is zero, in which case a thread may never become available.
DISCARD – Throw away the current request and return.
DISCARD_OLDEST – Throw away the oldest request and return.
ABORT – Throw a RuntimeException.
RUN – The default; the thread making the execute request runs the task itself, which helps guard against lockup.

threadWaitTimeout: How long to wait in milliseconds when the pool exhausted action is WAIT. If the value is negative, it waits indefinitely.

maxBufferSize: Determines how many requests are queued when the pool is at maximum usage capacity and the pool exhausted action is WAIT. The buffer is used as a kind of throttling for thread creation, before requests are processed.

Setting Timeout in Scatter-Gather


For this example we will increase sleep time in first route to 3 sec.

scatter-gather-3secconfig

And Configure Timeout value in Scatter-Gather to 2000 ( 2 sec).

set-timeout

In the output we will see that route 1 has timed out as route 1 was taking more time to complete the execution than what was specified in Scatter-Gather configuration.

Output Log:
timeout-error

Setting Max Active Thread


In the below example we will set Max Active thread to different values and see how it works.

Setting max Active Thread to 1 and set Pool Exhausted Action to WAIT.

Configuration:
maxthreadis1

On running the application we can see the output for the logger is sequential, this is because max Active thread will allow only 1 thread to run routes in scatter-gather thus route1 will execute first then route 2 and then route 3. Also Pool Exhausted Action set to WAIT will make thread wait until current route in execution is complete.

maxthreadis1-log

Note: This is how we can achieve sequential processing in scatter gather component.

Setting max Active Thread to 2 and set Pool Exhausted Action to WAIT.

On setting max Active Thread to 2 in scatter-gather, mule will create 2 max thread to execute all the routes configured, thus
route 1 and route 2 will execute parallel and then after completion of these 2, route 3 will be executed.

maxthreadis2-log

Setting max Active Thread to 3 and set Pool Exhausted Action to WAIT.

All the 3 routes in will execute parallel as there will be 3 thread created to execute routes in scatter-gather.

Exception Handling for Scatter-Gather.


Please read Validation Framework to understand how error is generated in the example.
In this we will see how we can handle exception if there is any failure in the route that are been executed.
By default, if any route fails, Scatter-Gather performs the following actions:

• sets the exception payload accordingly for each route

• throws a CompositeRoutingException, which maps each exception to its corresponding route using a sequential route ID

Catching the CompositeRoutingException allows you to gather information on all failed routes.

In the below example; for 2 routes inside Scatter-Gather we have used “Expression” component that will throw an error with error message “Error from Route 1/2”. We will use Choice Exception Strategy to handle the error and send the error message.

Flow Design
exception-scatter-gather-flow

    <flow name="post:/users:application/json:scatterGather-config">
<flow name="post:/users:application/json:scatterGather-config">
  <logger message="Flow Started" level="INFO" doc:name="Logger"/>
  <object-to-byte-array-transformer doc:name="Object to Byte Array"/>
  <scatter-gather doc:name="Scatter-Gather">
    <threading-profile maxThreadsActive="2" poolExhaustedAction="WAIT"/>
    <processor-chain>
      <logger level="INFO" message="Route 1 started" doc:name="Logger"/>
       <expression-component doc:name="Expression"><![CDATA[generateValidationException("Error From Route 1");]]></expression-component>
       <scripting:component doc:name="Groovy">
        <scripting:script engine="Groovy"><![CDATA[sleep(1000);
return message.payload;]]></scripting:script>
      </scripting:component>
      <logger level="INFO" message="Route 1 completed" doc:name="Logger"/>
    </processor-chain>
    <processor-chain>
      <logger level="INFO" message="Route 2 started" doc:name="Logger"/>
      <expression-component doc:name="Expression"><![CDATA[generateValidationException("Error From Route 2");]]></expression-component>
      <scripting:component doc:name="Groovy">
        <scripting:script engine="Groovy"><![CDATA[sleep(1000);
return message.payload;]]></scripting:script>
      </scripting:component>
      <logger level="INFO" message="Route 2 completed" doc:name="Logger"/>
    </processor-chain>
    <processor-chain>
      <logger message="Route 3 started" level="INFO" doc:name="Logger"/>
      <scripting:component doc:name="Groovy">
        <scripting:script engine="Groovy"><![CDATA[sleep(1000);
return message.payload;]]></scripting:script>
      </scripting:component>
      <logger level="INFO" message="Route 3 completed" doc:name="Logger"/>
    </processor-chain>
  </scatter-gather>
  <set-payload value="#[message.payloadAs(java.lang.String)]" mimeType="application/json" doc:name="Set Payload"/>
  <exception-strategy ref="Choice_Exception_Strategy" doc:name="Reference Exception Strategy"/>
</flow>

The exception generated is of class CompositeRoutingException which extends the Mule MessagingException to aggregate exceptions from different routes in the context of a single message router. Exceptions are correlated to each route through a sequential ID.

Exception Block:
catch-exception

compositeexception

We have used dataweave to loop through the exceptions and get the error from each route that has failed.

dataweave

<choice-exception-strategy name="Choice_Exception_Strategy">
  <catch-exception-strategy when="#[exception.causeMatches('*ValidationException*')]" doc:name="Validation Exception Strategy">
    <set-variable variableName="exceptionMessage" value="#[groovy:message.getExceptionPayload().getRootException().getMessage()]" doc:name="Set exceptionMessage"/>
    <set-property propertyName="http.status" value="500" doc:name="Property"/>
    <dw:transform-message doc:name="Transform Message">
      <dw:set-payload><![CDATA[%dw 1.0
  %output application/json
  ---
  {
  errorMessage: flowVars.exceptionMessage
  }]]></dw:set-payload>
    </dw:transform-message>
  </catch-exception-strategy>
  <catch-exception-strategy doc:name="CompositeRoutingException" when="#[exception.causeMatches('*CompositeRoutingException*')]">
    <set-payload value="#[exception]" doc:name="Set Payload"/>
    <dw:transform-message doc:name="Transform Message">
      <dw:set-payload><![CDATA[%dw 1.0
  %output application/json
  ---
  {
  errors: payload.exceptions map ((error, position) ->
  position ++ ":" ++ error.causeException.message
  )
  }]]></dw:set-payload>
    </dw:transform-message>

  </catch-exception-strategy>
  <catch-exception-strategy doc:name="Catch All Exception">
    <set-payload value="#[groovy:message.getExceptionPayload().getRootException().getMessage()]" doc:name="Set Payload"/>
  </catch-exception-strategy>
</choice-exception-strategy>


Final Output running the application

compositeexception-output

Note: We can also use #[exception.exceptions[0].getCauseException().getMessage()] to get error from failing route.

Download Mulesoft Scatter Gather Exception Handling Project


About Varun Goel

Varun Goel is a technology enthusiast with 6+ years exp in IT industry. In fact, he is been developing application after schooling as freelancer. Currently working with one of the Fortune’s 100 Companies having vast experience Mule ESB, Tibco, HTML5, CSS, JSS, Android, Core Java, JSP, PHP, MySQL, AutoCAD, Maya, ZBrush, Photoshop, Flash CS and many more.

4 comments

  1. Avinash

    Hey Varun, Looking forward for more posts on Mule. Please dont let the thread die. Thanks for your efforts

Leave a Reply

Your email address will not be published.