Processing dynamic amount of flows in parallel in Mule 4
Sometimes it can be very useful to be able to process flows in parallel. This way you can drastically improve the performance of a service. When you have a static amount of flows you need to process, you can use the Scatter-Gather component in Mule. However, when you don’t know the amount of flows that you need to trigger at design time, the Scatter-Gather component won’t do the trick unfortunately.
Sequential processing using a For Each loop
Mule delivers the For Each loop component out-of-the-box. This component can be used to process a dynamic amount of records. Unfortunately, this will process all iterations sequentially. As an example, I have created a simple flow that will retrieve the latitude and longitude for a dynamic amount of cities using the MetaWeather API:
Below you can find an example of a request with the corresponding response:
Besides the response, this request resulted in the following loglines as well. As you can see, the retrieval of the cities was done sequentially.
INFO 2019-02-27 11:36:26,651 Retrieving ["Amsterdam"]
INFO 2019-02-27 11:36:26,895 Finised retrieving ["Amsterdam"]
INFO 2019-02-27 11:36:26,899 Retrieving ["London"]
INFO 2019-02-27 11:36:27,090 Finised retrieving ["London"]
INFO 2019-02-27 11:36:27,094 Retrieving ["Paris"]
INFO 2019-02-27 11:36:27,395 Finised retrieving ["Paris"]
Parallel processing using a Group Based Aggregator
Now that we have mentioned 2 components (Scatter-Gather and For Each) that did not do the trick for us, let’s have a look how we can achieve the desired behavior. To do this, we will use the Mule Aggregators Module . Note that this module is not pre-installed in Anypoint Studio, so it must be added manually.
Again we’ll start by adding a For Each loop to the flow, to iterate over the cities in the request. Within this For Each loop we will add a Async block, so Mule will be able to process these flows in parallel threads.
Now we just need to make the main flow wait until all of the parallel processes are finished, to be able to move on in the flow for further processing (or in our case, construct the response payload). To do this, we’ll add a Group based aggregator component as the final step within the Async block. On this component we need to configure the following settings:
- Set a Name. In our example we’ve used CitiesAggregator.
- Set a Group id. Because we just have 1 Group based aggregator, we can use the default #[correlationId]. However, when you have multiple of these Group based aggregator designs in a single flow, you should use a more distinctive identifier like #[‘CITIES-AGGREGATOR-‘ ++ correlationId].
- Set the Group size the amount of cities in the initial payload. When we trigger the flow with 3 cities, this will cause that the first 2 cities that reach the Group based aggregator step to end up in the Incremental aggregation phase, but the last one to end up in the Aggregation complete phase.
The Incremental aggregation phase of the Group based aggregator can be left empty. In the Aggegation complete phase we now just need a trigger to move on with the (synchronous) main flow. We will use a VM Publish for this. Create a new VM Config and add a queue, for example with the name allCitiesRetrieved. Enter the same Queue name in the VM Publish step configuration.
Now we finally need something to make the main flow wait after the For Each loop until the VM Publish has been triggered. Add a VM Consume step right behind the For Each loop. Use the same VM Config and Queue name as in the VM Publish step. You can also specify a timeout value here, which will cause an exception when 1 (or more) of the parallel threads did not finish in time. The payload will be an array of all payloads that reached the group (so similar to the payload after a Scatter-Gather block) after the VM Consume step.
Important: When you specify a timeout on the VM Consume phase, make sure that they “match” the (eviction) timeout settings on both the Group based aggregator and the VM Publish steps. Because if the message was published before these timeouts, but the VM Consume already resulted in a timeout, the next time the flow is used the VM Consume will immediately pick up the old (already existing) message from the previous execution. This will cause unexpected behavior, like old results to be returned!
The total flow will now look something like the below example:
Let’s send another request to the service:
When we have another look at the logging, we can now see that the requests were actually processed in parallel. Also, the response time has been reduced.
INFO 2019-02-27 12:44:02,626 Retrieving ["Amsterdam"]
INFO 2019-02-27 12:44:02,627 Retrieving ["London"]
INFO 2019-02-27 12:44:02,629 Retrieving ["Paris"]
INFO 2019-02-27 12:44:02,855 Finised retrieving ["Paris"]
INFO 2019-02-27 12:44:02,875 Finised retrieving ["London"]
INFO 2019-02-27 12:44:03,077 Finised retrieving ["Amsterdam"]
Important: Please note that the order of Array after the Consume step is determined by which request finished processing first, instead of the trigger order! When you execute the flow multiple times, you’ll see that the order of the response entities varies every time.
The demo application that I’ve used while writing this blog, can be downloaded here.
Good luck processing your flows in parallel and improving the performance of your flows!