Callable, Executors, ExecutorService, ThreadPool, and Future
Callable is
an interface that declares only one method: call(). Its full signature is V
call() throws Exception. It represents a task that needs to be completed by a
thread. Once the task completes, it returns a value. For some reason, if the
call() method cannot execute or fails, it throws an Exception.
To execute a
task using the Callable object, you first create a thread pool. A thread pool
is a collection of threads that can execute tasks. You create a thread pool
using the Executors utility class. This class provides methods to get instances
of thread pools, thread factories, etc.
The
ExecutorService interface implements the Executor interface and provides
services such as termination of threads and production of Future objects. Some
tasks may take considerable execution time to complete. So, when you submit a
task to the executor service, you get a Future object.
Future
represents objects that contain a value that is returned by a thread in the
future (i.e., it returns the value once the thread terminates in the “future”).
You can use the isDone() method in the Future class to check if the task is
complete and then use the get() method to fetch the task result. If you call
the get() method directly while the task is not complete, the method blocks
until it completes and returns the value once available.
import java.util.concurrent.Callable;
public class Factorial implements Callable<Long> {
long n;
public Factorial(long n) {
this.n = n;
}
public Long call() throws Exception {
if (n <= 0) {
throw new Exception("for finding factorial, N should be > 0");
}
long fact = 1;
for (long longVal = 1; longVal <= n; longVal++) {
fact *= longVal;
}
return fact;
}
}
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class CallableTest {
public static void main(String []args) throws Exception {
// the value for which we want to find the factorial
long N = 4;
// get a callable task to be submitted to the executor service
Callable<Long> task = new Factorial(N);
// create an ExecutorService with a fixed thread pool consisting of one thread
ExecutorService es = Executors.newSingleThreadExecutor();
// submit the task to the executor service and store the Future object
Future<Long> future = es.submit(task);
// wait for the get() method that blocks until the computation is complete.
System.out.printf("factorial of %d is %d", N, future.get());
// done. shutdown the executor service since we don't need it anymore
if(!es.isShutdown())
es.shutdown();
System.out.println(" and Sevice is shutdown");
}
}
In this
program, you have a Factorial class that implements Callable. Since the task is
to compute the factorial of
a number N, the task needs to return a result. You use Long type for the
factorial value, so you implement Callable<Long>.
Inside the Factorial class, you define the call() method that actually performs
the task (the task
here is to
compute the factorial of the given number). If the given value N is negative or
zero, you don’t perform the task and
throw an exception to the caller. Otherwise, you loop from 1 to N and find the
factorial value.
In the
CallableTest class, you first create an instance of the Factorial class. You
then need to execute this task.
For the sake
of simplicity, you get a singled-threaded executor by calling the
newSingleThreadExecutor() method in the Executors class. Note that you could
use other methods such as newFixedThreadPool(nThreads) to create a thread pool
with multiple threads depending on the level of parallelism you need.Once you
get an ExecutorService, you submit the task for execution. ExecutorService
abstracts details such as when the task is executed, how the task is assigned
to the threads, etc. You get a reference to Future<Long> when you call
the submit(task) method. From this future reference, you call the get() method
to fetch the result after completing the task. If the task is still executing
when you call future.get(), this get() method will block until the task
execution completes. Once the execution is complete, you need to manually
release the ExecutorService by calling the shutdown() method.Now that you are
familiar with the basic mechanism of how to execute tasks, here’s a complex
example. Assume that your task is to find the sum of numbers from 1 to N where
N is a large number (a million in our case). Of course, you can use the formula
[(N * (N + 1)) / 2] to find out the sum. Yes, you’ll make use of this formula
to check if the summation from 1 . . . N is correct or not. However, just for
illustration, you’ll divide the range 1 to 1 million to N sub-ranges and by
spawn N threads to sum up numbers in that sub-range.
import java.util.*;
import java.util.concurrent.*;
//We
create a class SumOfN that sums the values from 1..N where N is a large number.
//We
divide the task // to sum the numbers to 10 threads (which is an arbitrary
limit just for illustration).
//Once
computation is complete, we add the results of all the threads,
//and
check if the calculation is correct by using the formula (N * (N + 1))/2.
class SumOfN {
private static long N = 1_000_000L;
//
one million
private static long calculatedSum = 0;
//
value to hold the sum of values in range 1..N
private static final int NUM_THREADS = 10;
//
number of threads to create for distributing the effort
//
This Callable object sums numbers in range from..to
static class SumCalc implements Callable<Long>
{
long from, to,
localSum = 0;
public SumCalc(long from, long to)
{
this.from = from;
this.to = to;
}
public Long call() {
// add in range 'from' .. 'to' inclusive of the value 'to'
for(long i = from; i
<= to;
i++) {
localSum += i;
}
return localSum;
}
}
//In
the main method we implement the logic to divide the summation tasks to
//given
number of threads and finally check if the calculated sum is correct
public static void main(String []args) {
//Divide
the task among available fixed number of threads
ExecutorService
executorService =
Executors.newFixedThreadPool(NUM_THREADS);
//store
the references to the Future objects in a List for summing up together
List<Future<Long>>
summationTasks = new ArrayList<>();
long nByTen = N/10; // divide N by 10 so that it can be submitted as 10 tasks
for(int i = 0; i < NUM_THREADS; i++) {
//create
a summation task
//starting
from (10 * 0) + 1 .. (N/10 * 1) to (10 * 9) + 1 .. (N/10 * 10)
long fromInInnerRange = (nByTen * i) + 1;
long toInInnerRange = nByTen * (i+1);
System.out.printf("Spawning thread for summing in range %d to %d %n",
fromInInnerRange, toInInnerRange);
//Create
a callable object for the given summation range
Callable<Long>
summationTask =
new SumCalc(fromInInnerRange, toInInnerRange);
//submit
that task to the executor service
Future<Long>
futureSum = executorService.submit(summationTask);
//it
will take time to complete, so add it to the list to revisit later
summationTasks.add(futureSum);
}
executorService.shutdown();
//now,
find the sum from each task
for(Future<Long> partialSum : summationTasks) {
try {
//the
get() method will block (i.e., wait) until the computation is over
calculatedSum += partialSum.get();
}
catch(CancellationException | ExecutionException
|
InterruptedException exception) {
//unlikely
that you get an exception - exit in case something goes wrong
exception.printStackTrace();
System.exit(-1);
}
}
//now
calculate the sum using formula (N * (N + 1))/2 without doing the hard-work
long formulaSum = (N * (N + 1))/2;
//print
the sum using formula and the ones calculated one by one
//they
must be equal!
System.out.printf("Sum by threads = %d, sum using formula = %d",
calculatedSum, formulaSum);
}
}
/*
Spawning
thread for summing in range 1 to 100000000
Spawning
thread for summing in range 100000001 to 200000000
Spawning
thread for summing in range 200000001 to 300000000
Spawning
thread for summing in range 300000001 to 400000000
Spawning
thread for summing in range 400000001 to 500000000
Spawning
thread for summing in range 500000001 to 600000000
Spawning
thread for summing in range 600000001 to 700000000
Spawning
thread for summing in range 700000001 to 800000000
Spawning
thread for summing in range 800000001 to 900000000
Spawning
thread for summing in range 900000001 to 1000000000
Sum
by threads = 500000000500000000, sum using formula = 500000000500000000
Let’s
now analyze how this program works. In this program, you need to find the sum
of 1..N where N is one
million
(a large number). The class SumCalc implements Callable<Long> to
sum the values in the range from to
to.
The call() method performs the actual computation of the sum by looping from
from to to and returns the
intermediate
sum value as a Long value.
In
this program, you divide the summation task among multiple threads. You can
determine the number of
threads
based on the number of cores available in your processor; however, for the sake
of keeping the program
simpler,
use ten threads.
In
the main() method, you create a ThreadPool with ten threads. You are going to
create ten summation tasks, so
you
need a container to hold the references to those tasks. Use ArrayList to hold
the Future<Long> references.
In
the first for loop in main(), you create ten tasks and submit them to the
ExecutorService. As you submit a
task,
you get a Future<Long> reference and you add it to the ArrayList.
Once
you’ve created the ten tasks, you traverse the array list in the next for loop
to get the results of the tasks. You
sum
up the partial results of the individual tasks to compute the final sum.
Once
you get the computed sum of values from one to one million, you use the simple
formula N * (N + 1)/2
to
find the formula sum. From the output, you can see that the computed sum and
the formula sum are equal, so you
can ascertain that your
logic of dividing the tasks and combining the results of the tasks worked
correctly.*/
ThreadFactory
ThreadFactory is an interface that is
meant for creating threads instead of explicitly creating threads by calling
new Thread(). For example, assume that you often create high-priority threads.
You can create a MaxPriorityThreadFactory to set the default priority of
threads created by that factory to maximum priority
This will use when we wanna set some
thread priority high so we can use below example for standard way.
import java.util.concurrent.ThreadFactory;
public class MaxPriorityThreadFactory implements ThreadFactory {
private static long count = 0;
public Thread newThread(Runnable r) {
Thread temp = new Thread(r);
temp.setName("prioritythread" + count++);
temp.setPriority(Thread.MAX_PRIORITY);
return temp;
}
}
public class ARunnable implements Runnable {
public void run() {
System.out.println("Running the created thread ");
}
}
public class TestThreadFactory {
public static void main(String []args) {
ThreadFactory threadFactory = new
MaxPriorityThreadFactory();
ThreadFactory threadFactory1 = new
MaxPriorityThreadFactory();
Thread t1 = threadFactory.newThread(new ARunnable());
System.out.println("The name of the thread is " + t1.getName());
System.out.println("The priority of the thread is " + t1.getPriority());
t1.start();
}
}
The ThreadLocalRandom Class When you
do concurrent programming, you’ll find that there is often a need to generate
random numbers.Using Math.random() is not efficient for concurrent programming.
For this reason, the java.util.concurrent package introduces the
ThreadLocalRandom class, which is suitable for use in concurrent programs. You
can use ThreadLocalRandom.current() and then call methods such as nextInt() and
nextFloat() to generate the random numbers.
No comments:
Post a Comment