java 8 parallel stream performance
Java 8 parallel streaming internally use Fork Join Pool to break raw data and return as a stream.So first let us discuss on Fork and Join Pool in Java 8.
Use the Parallel Fork/Join Framework
The
Fork/Join framework in the java.util.concurrent package helps simplify writing
parallelized code.
Before we
look at the ForkJoinPool I want to explain how the fork and join principle
works in general just a overview.
The fork and
join principle consists of two steps which are performed recursively. These two
steps are the fork step and the join step.
Fork
Join
When a task
has split itself up into subtasks, the task waits until the subtasks have finished
executing.
Once the
subtasks have finished executing, the task may join (merge) all the results
into one result.
A task that
uses the fork and join principle can fork (split) itself into smaller subtasks
which can be executed concurrently. This is illustrated in the complete diagram
below:
The
framework is an implementation of the ExecutorService
interface and provides an easy-to-use concurrent platform in order to exploit
multiple processors. This framework is very useful for modeling divide-and-conquer problems. Divide-and-conquer
is a naturally parallel algorithmic technique. Most often we can solve the sub
instances in parallel. This can lead to significant amount of parallelism since
at each level of can create more instances to solve in parallel. Even if we
only divide our instance into two sub instances, each of those sub instances
will themselves generate two more sub-instances, and this repeats.
This
approach is suitable for tasks that can be divided recursively and computed on
a smaller scale; the computed results are then combined. Dividing the task into
smaller tasks is forking, and merging the results from the smaller tasks is
joining.
The Fork/Join framework uses the work-stealing algorithm: when a worker
thread completes its work and is free, it takes (or “steals”) work from other
threads that are still busy doing some work. Initially, it will appear to you
that using Fork/Join is a complex task. Once you get familiar with it, however,
you’ll realize that it is conceptually easy and that it significantly
simplifies your job. The key is to recursively subdivide the task into smaller
chunks that can be processed by separate threads.
Visualizes
how the task is recursively subdivided into smaller tasks and how the partial
results are combined. As shown by the figure, a task is split into two
subtasks, and then each subtask is again split in two subtasks, and so on until
each split subtask is computable by each thread. Once a thread completes the
computation, it returns the result for combining it with other results; in this
way all the computed results are combined back.
How the
Fork/Join framework uses divide-and-conquer to complete the task.
Briefly, the
Fork/Join algorithm is designed as follows:
forkJoinAlgorithm()
{
split tasks;
fork the
tasks;
join the
tasks;
compose the
results;
}
Here is the
pseudo-code of how these steps work:
doRecursiveTask(input)
{
if (the task
is small enough to be handled by a thread) {
compute the
small task;
if there is
a result to return, do so
}
else {
divide
(i.e., fork) the task into two parts
call
compute() on first task, join() on second task, combine both results and return
}
}
Useful
Classes of the Fork/Join Framework the following classes play key roles in the
Fork/Join framework: ForkJoinPool, ForkJoinTask,
RecursiveTask, and RecursiveAction. Let’s consider these classes in more
detail.
To provide
effective parallel execution, the fork/join framework uses a pool of threads
called the ForkJoinPool, which manages worker threads of type ForkJoinWorkerThread.
The ForkJoinPool is the heart of the
framework. It is an implementation of the ExecutorService that manages worker
threads and provides us with tools to get information about the thread pool
state and performance.
Worker
threads can execute only one task at the time, but the ForkJoinPool doesn’t
create a separate thread for every single subtask. Instead, each thread in the
pool has its own double-ended queue which stores tasks. This architecture is
vital for balancing the thread’s workload with the help of the work-stealing
algorithm which is simply free threads try to “steal” work from
deques of busy threads
Important
Methods in the ForkJoinPool Class.
void execute(ForkJoinTask<?> task) Executes a given task asynchronously.
void execute(ForkJoinTask<?> task) Executes a given task asynchronously.
<T> T invoke(ForkJoinTask<T>
task) Executes the
given task and returns the computed result.
<T>
List<Future<T>>invokeAll(Collection<? extends
Callable<T>> tasks) Executes all the given tasks and returns a list of future
objects when all the tasks are completed.
boolean isTerminated() Returns true if all the tasks are
completed.
int getParallelism()
Status checking methods.
int getPoolSize()
long getStealCount()
int getActiveThreadCount()
<T> ForkJoinTask<T>
submit(Callable<T> task) Executes a submitted task. Overloaded versions take different
types of tasks; returns a Task object or a Future object.
<T> ForkJoinTask<T>
submit(ForkJoinTask<T> task)
ForkJoinTask<?> submit(Runnable
task)
<T> ForkJoinTask<T>
submit(Runnable task, T result)
In Java 8,
the most convenient way to get access to the instance of the ForkJoinPool is to
use its static method commonPool(). As its name suggests, this will provide a
reference to the common pool, which is a default thread pool for every
ForkJoinTask.
According to
Oracle’s documentation, using the predefined common pool reduces resource
consumption, since this discourages the creation of a separate thread pool per
task.
ForkJoinPool
commonPool = ForkJoinPool.commonPool();
In Java 7
Now it can
be easily accessed
ForkJoinPool
forkJoinPool = PoolUtil.forkJoinPool;
With
ForkJoinPool’s constructors, it is possible to create a custom thread pool with
a specific level of parallelism, thread factory, and exception handler. In the
example above, the pool has a parallelism level of 2. This means that pool will
use 2 processor cores.
ForkJoinTask<V> is a lightweight thread-like entity
representing a task that defines methods
such as
fork() and join(). ForkJoinTask is the base type for tasks executed inside
ForkJoinPool.
In practice,
one of its two subclasses should be extended: the RecursiveAction for void tasks and the RecursiveTask<V> for tasks that return a value.
They both
have an abstract method compute() in which the task’s logic is defined.
Important Methods in the ForkJoinTask
Class.
boolean cancel(boolean
mayInterruptIfRunning) Attempts to cancel the execution of the task.
ForkJoinTask<V> fork() Executes the task asynchronously.
- V join() Returns the result of the computation when the computation is done.
- V get() Returns the result of the computation; waits if the computation is not complete.
- V invoke() Starts the execution of the submitted tasks; waits until computation complete, and returns results.
static <T extends
ForkJoinTask<?>> Collection<T> invokeAll(Collection<T>
tasks)
boolean isCancelled() Returns true if the task is
cancelled.
boolean isDone() Returns true if the task is
completed.
Let’s
ascertain how you can use Fork/Join framework in problem solving. Here are the
steps to use the framework:
First, check
whether the problem is suitable for the Fork/Join framework or not.
Remember:
The
Fork/Join framework is not suitable for all kinds of tasks. This framework is
suitable if your problem fits this description:
The problem
can be designed as a recursive task where the task can be subdivided into smaller
units and the results can be combined together.
The
subdivided tasks are independent and can be computed separately without the
need for communication between the tasks when computation is in process. (Of
course, after the computation is over, you will need to join them together.)
If the
problem you want to solve can be modeled recursively, then define a task class
that extends either RecursiveTask or RecursiveAction. If a task returns a
result, extend from RecursiveTask; otherwise extend from RecursiveAction.Override the
compute() method in the newly defined task class. The compute() method actually
performs the task if the task is small enough to be executed; or split the task
into subtasks and invoke them. The subtasks can be invoked either by
invokeAll() or fork() method (use fork() when the subtask returns a value). Use
the join() method to get the computed results (if you used fork() method
earlier).
Merge the
results, if computed from the subtasks. Then instantiate ForkJoinPool, create
an instance of the task class, and start the execution of the task using the
invoke() method on the ForkJoinPool instance.
Now let’s
try solving the problem of how to sum 1..N where N is a large number. We subdivided
the sum computation task iteratively into ten sub-ranges; then you computed the
sum for each sub-range and then computed the sum-of-the-partial sums.
Alternatively, you can solve this problem rescursively using the Fork/Join framework.
Example have
attached.
Now we used
RecursiveTask; however, if a task is not returning a value, then we should use RecursiveAction.
Let’s implement a search program using RecursiveAction. Assume that you have a
big array (say of 10,000 items) and we want to search a key item. You can use
the Fork/Join framework to split the task into several subtasks and execute
them in parallel.
Example have
attached.
No comments:
Post a Comment