Wednesday 2 January 2019

java 8 parallel stream performance

java 8 parallel stream performance

Java 8 parallel streaming internally use Fork Join Pool to break raw data and return as a stream.

So first let us discuss on Fork and Join Pool in Java 8.



Use the Parallel Fork/Join Framework
The Fork/Join framework in the java.util.concurrent package helps simplify writing parallelized code.

Before we look at the ForkJoinPool I want to explain how the fork and join principle works in general just a overview.

The fork and join principle consists of two steps which are performed recursively. These two steps are the fork step and the join step.


Fork

Join

When a task has split itself up into subtasks, the task waits until the subtasks have finished executing.
Once the subtasks have finished executing, the task may join (merge) all the results into one result.


A task that uses the fork and join principle can fork (split) itself into smaller subtasks which can be executed concurrently. This is illustrated in the complete diagram below:

The framework is an implementation of the ExecutorService interface and provides an easy-to-use concurrent platform in order to exploit multiple processors. This framework is very useful for modeling divide-and-conquer problems. Divide-and-conquer is a naturally parallel algorithmic technique. Most often we can solve the sub instances in parallel. This can lead to significant amount of parallelism since at each level of can create more instances to solve in parallel. Even if we only divide our instance into two sub instances, each of those sub instances will themselves generate two more sub-instances, and this repeats.

This approach is suitable for tasks that can be divided recursively and computed on a smaller scale; the computed results are then combined. Dividing the task into smaller tasks is forking, and merging the results from the smaller tasks is joining.

The Fork/Join framework uses the work-stealing algorithm: when a worker thread completes its work and is free, it takes (or “steals”) work from other threads that are still busy doing some work. Initially, it will appear to you that using Fork/Join is a complex task. Once you get familiar with it, however, you’ll realize that it is conceptually easy and that it significantly simplifies your job. The key is to recursively subdivide the task into smaller chunks that can be processed by separate threads.
Visualizes how the task is recursively subdivided into smaller tasks and how the partial results are combined. As shown by the figure, a task is split into two subtasks, and then each subtask is again split in two subtasks, and so on until each split subtask is computable by each thread. Once a thread completes the computation, it returns the result for combining it with other results; in this way all the computed results are combined back.
How the Fork/Join framework uses divide-and-conquer to complete the task.
Briefly, the Fork/Join algorithm is designed as follows:

forkJoinAlgorithm() {
split tasks;
fork the tasks;
join the tasks;
compose the results;
}

Here is the pseudo-code of how these steps work:

doRecursiveTask(input) {
if (the task is small enough to be handled by a thread) {
compute the small task;
if there is a result to return, do so
}
else {
divide (i.e., fork) the task into two parts
call compute() on first task, join() on second task, combine both results and return
}
}
Useful Classes of the Fork/Join Framework the following classes play key roles in the Fork/Join framework: ForkJoinPool, ForkJoinTask, RecursiveTask, and RecursiveAction. Let’s consider these classes in more detail.
To provide effective parallel execution, the fork/join framework uses a pool of threads called the ForkJoinPool, which manages worker threads of type ForkJoinWorkerThread.
The ForkJoinPool is the heart of the framework. It is an implementation of the ExecutorService that manages worker threads and provides us with tools to get information about the thread pool state and performance.

Worker threads can execute only one task at the time, but the ForkJoinPool doesn’t create a separate thread for every single subtask. Instead, each thread in the pool has its own double-ended queue which stores tasks. This architecture is vital for balancing the thread’s workload with the help of the work-stealing algorithm  which is simply free threads try to “steal” work from deques of busy threads
Important Methods in the ForkJoinPool Class.
void execute(ForkJoinTask<?> task) Executes a given task asynchronously.
<T> T invoke(ForkJoinTask<T> task) Executes the given task and returns the computed result.
<T> List<Future<T>>invokeAll(Collection<? extends Callable<T>> tasks) Executes all the given tasks and returns a list of future objects when all the tasks are completed.
boolean isTerminated() Returns true if all the tasks are completed.
int getParallelism()  Status checking methods.
int getPoolSize()
long getStealCount()
int getActiveThreadCount()
<T> ForkJoinTask<T> submit(Callable<T> task) Executes a submitted task. Overloaded versions take different types of tasks; returns a Task object or a Future object.
<T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
ForkJoinTask<?> submit(Runnable task)
<T> ForkJoinTask<T> submit(Runnable task, T result)
In Java 8, the most convenient way to get access to the instance of the ForkJoinPool is to use its static method commonPool(). As its name suggests, this will provide a reference to the common pool, which is a default thread pool for every ForkJoinTask.
According to Oracle’s documentation, using the predefined common pool reduces resource consumption, since this discourages the creation of a separate thread pool per task.
ForkJoinPool commonPool = ForkJoinPool.commonPool();

In Java 7

Now it can be easily accessed

ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;
With ForkJoinPool’s constructors, it is possible to create a custom thread pool with a specific level of parallelism, thread factory, and exception handler. In the example above, the pool has a parallelism level of 2. This means that pool will use 2 processor cores.

ForkJoinTask<V> is a lightweight thread-like entity representing a task that defines methods
such as fork() and join(). ForkJoinTask is the base type for tasks executed inside ForkJoinPool.
In practice, one of its two subclasses should be extended: the RecursiveAction for void tasks and the RecursiveTask<V> for tasks that return a value.

They both have an abstract method compute() in which the task’s logic is defined.
Important Methods in the ForkJoinTask Class.

boolean cancel(boolean mayInterruptIfRunning) Attempts to cancel the execution of the task.
ForkJoinTask<V> fork() Executes the task asynchronously.
  • V join() Returns the result of the computation when the computation is done.
  • V get() Returns the result of the computation; waits if the computation is not complete.
  • V invoke() Starts the execution of the submitted tasks; waits until computation complete, and returns results.

static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks)

boolean isCancelled() Returns true if the task is cancelled.

boolean isDone() Returns true if the task is completed.

Let’s ascertain how you can use Fork/Join framework in problem solving. Here are the steps to use the framework:

First, check whether the problem is suitable for the Fork/Join framework or not.
Remember:

The Fork/Join framework is not suitable for all kinds of tasks. This framework is suitable if your problem fits this description:

The problem can be designed as a recursive task where the task can be subdivided into smaller units and the results can be combined together.

The subdivided tasks are independent and can be computed separately without the need for communication between the tasks when computation is in process. (Of course, after the computation is over, you will need to join them together.)

If the problem you want to solve can be modeled recursively, then define a task class that extends either RecursiveTask or RecursiveAction. If a task returns a result, extend from RecursiveTask; otherwise extend from RecursiveAction.Override the compute() method in the newly defined task class. The compute() method actually performs the task if the task is small enough to be executed; or split the task into subtasks and invoke them. The subtasks can be invoked either by invokeAll() or fork() method (use fork() when the subtask returns a value). Use the join() method to get the computed results (if you used fork() method earlier).

Merge the results, if computed from the subtasks. Then instantiate ForkJoinPool, create an instance of the task class, and start the execution of the task using the invoke() method on the ForkJoinPool instance.

Now let’s try solving the problem of how to sum 1..N where N is a large number. We subdivided the sum computation task iteratively into ten sub-ranges; then you computed the sum for each sub-range and then computed the sum-of-the-partial sums. Alternatively, you can solve this problem rescursively using the Fork/Join framework.

Example have attached.

Now we used RecursiveTask; however, if a task is not returning a value, then we should use RecursiveAction. Let’s implement a search program using RecursiveAction. Assume that you have a big array (say of 10,000 items) and we want to search a key item. You can use the Fork/Join framework to split the task into several subtasks and execute them in parallel.
Example have attached.

No comments:

Post a Comment