Processing dependent files in Apache Camel
 
  On a project I’m currently working on we ran into the situation that we had to pick up files from an endpoint, but that the files could have dependencies on/relations with each other. Of course this is not an ideal situation, it would be more desirable to be able to process each file independently. Unfortunately the system responsible for delivering the data was not capable of providing it in a different way.
An example of these files can be found below. As you can see file-3.txt needs file-1.txt to be processed beforehand. After file-3.txt, file-4.txt can be processed, and so on. The last file, file-6.txt should always be processed last, because (indirectly) it depends on all other files.
 
  In the above example it still looks pretty simple of course. We could just make a static list where we would specify in which order we would process the files. However, in our situation it were a lot more files. This would mean that keeping this list up to date would get real complex real fast, especially when files have multiple (nested) dependencies.
Sorting algorithms
Because of this complexity, we decided to make the application responsible for determining in which order the files should be processed.
This is possible with a topological sorting algorithm. Using this algorithm it’s possible to use a directed acyclic graph (DAG) to indicate precedence among things (in our case: files). A lot of articles can be found about topological sorting algorithms and various ways to implement them. I found the Topological Sort of Directed Acyclic Graph article from Baeldung really helpful.
From this article, we used “Algorithm 3: DFS Algorithm for Topological Sort” as our main inspiration. The only difference is that, besides the “visited” collection, we also keep track of the elements that we are currently visiting. This way we can detect cyclic dependencies.
In our case, all file types have been defined in classes that implement the FileWithDependencies interface. Via this interface, it’s possible to specify which of the other files that type is dependent on.
private static Deque<Class<? extends FileWithDependencies>> topologicalSort(Set<Class<? extends FileWithDependencies>> files) {
    Deque<Class<? extends FileWithDependencies>> deque = new ArrayDeque<>();
    Set<Class<? extends FileWithDependencies>> visiting = new HashSet<>();
    Set<Class<? extends FileWithDependencies>> visited = new HashSet<>();
    for (var current : files) {
        if (!visited.contains(current)) {
            topologicalSortRecursive(current, visiting, visited, deque);
        }
    }
    return deque;
}
private static void topologicalSortRecursive(Class<? extends FileWithDependencies> current,
                                             Set<Class<? extends FileWithDependencies>> visiting,
                                             Set<Class<? extends FileWithDependencies>> visited,
                                             Deque<Class<? extends FileWithDependencies>> deque) {
    if (visiting.contains(current)) {
        throw new RuntimeException("Cyclic dependency detected");
    }
    visiting.add(current);
    for (var dependency : FileWithDependencies.retrieveDependencies(current)) {
        if (!visited.contains(dependency)) {
            topologicalSortRecursive(dependency, visiting, visited, deque);
        }
    }
    deque.addLast(current);
    visited.add(current);
    visiting.remove(current);
}
                    Creating a sorter using the topological sort algorithm
In Camel you can provide a Comparator to the sorter parameter on file based endpoints. Now we have the possibility to sort our files and dependencies using the topological sorting algorithm, the implementation of that Comparator is fairly easy.
We will use the sorted list to assign “importance” to a file name. In the actual compare step, we can just compare these “importance” values:
public class TopologicalSorter implements Comparator<GenericFile<Object>> {
    private final Map<String, Integer> fileImportance;
    public TopologicalSorter() {
        Set<Class<? extends FileWithDependencies>> unsorted = FileWithDependencies.FILE_TYPES;
        fileImportance = Collections.unmodifiableMap(constructFileImportance(unsorted));
    }
    @Override
    public int compare(GenericFile<Object> file1, GenericFile<Object> file2) {
        return Integer.compare(getImportance(file1), getImportance(file2));
    }
    private int getImportance(GenericFile<Object> file) {
        return fileImportance.getOrDefault(file.getFileName(), -1);
    }
    private static Map<String, Integer> constructFileImportance(Set<Class<? extends FileWithDependencies>> unsorted) {
        var sorted = topologicalSort(unsorted);
        var sortedSize = sorted.size();
        Map<String, Integer> result = new LinkedHashMap<>(sortedSize);
        for (var i = 0; i < sortedSize; i++) {
            var file = sorted.removeFirst();
            var fileName = FileWithDependencies.retrieveFileName(file);
            result.put(fileName, i);
        }
        return result;
    }
    // ... The topological sort logic from the previous examples ...
}
                    Applying the sorter in a Camel route
Now that we have the Comparator in place, we can supply it to a file based endpoint via the sorter parameter:
public class TopologicalSortRouteBuilder extends RouteBuilder {
    @Override
    public void configure() {
        String includeFilter = FileWithDependencies.FILE_TYPES.stream()
                .map(FileWithDependencies::retrieveFileName)
                .collect(Collectors.joining(","));
        from("file:/tmp?antInclude=" + includeFilter + "&doneFileName=done&sorter=#topologicalSorter")
                .log("Processed ${headers.CamelFileName}");
    }
}
                    Full code and working example
The full code is available on the Integration & Application Talents Bitbucket - here. Besides the working code, it also contains a unit test that you can use to test the feature/see the feature in action.
Please note that if you run the unit test a couple of times, you might see various different orders in which the files are being processed in the log file. This is the case because there are multiple “correct” orders (graphs) in which the files can be processed. Some examples:
- 2 – 5 – 1– 3 – 4 – 6
- 1 – 3 – 4– 2 – 5 – 6
- 1 – 2 – 3 – 5 – 4 – 6
- …
Things to keep in mind
Even though we got this logic working; there are still quite a few snags that you need to keep in mind when implementing similar logic.
- The most important thing is that the sorting how we implemented it, will only function correctly if the entire batch of files is processed all at once. In our case we added the doneFileName parameter to trigger the route if all files are fully uploaded to the directory. When you don’t specify this (or a similar) parameter, it could happen that, for example, file-1/3/6.txt are uploaded just before a poll, but file-2/4/5.txt finish uploading “too late” and move on to the next poll. In this situation file-6.txt will be processed after file-1/3.txt, but before file-2/4/5.txt, which is incorrect!
 
- Also, the (sorting of a) poll will only be done on a single application instance of course. This means that if you run this application in a replicated environment, you have to make sure that the entire batch of files will be processed on a single application instance. Otherwise, similar to the above example, if file-1/3/6.txt are processed on node-1, and node-2 processes file-2/4/5.txt at the same time, will result in a race condition determining whether file-6.txt will be processed too early (before file-2/4/5.txt) or not.
 
- Error handling is also a major thing to think about. For example, what should the application do when file-5.txt fails? Should it skip file-6.txt entirely? In our case, we made the decision not to implement any additional logic. If file-5.txt fails, that will probably mean that file-6.txt will fail as well (because some relations can’t be found). Both files will go into the error flow. However, of course that’s really dependent on the situation. Maybe other systems are more error prone and require different behavior. But in our case this was out-of-scope.
Conclusion
Researching the topological sorting algorithms and implementing the sorter to process dependent files in Apache Camel was a fun thing to do. In the end we got it working for a solution that meets our needs.
However, there are a lot of things to keep in mind while implementing a solution like this. Especially in a bit more complex environment where applications should be ran replicated, or where more complex error handling flows need to be implemented.
So still, my advice would be to check if the upstream system is capable of providing the data in a way that it can be processed completely separated from each other. But if this is not the case, maybe this article could help as a guide in the right direction!
 
   
   
  