Monday 31 December 2018

Callable Executors ExecutorService ThreadPool and Future class

Callable, Executors, ExecutorService, ThreadPool, and Future


Callable is an interface that declares only one method: call(). Its full signature is V call() throws Exception. It represents a task that needs to be completed by a thread. Once the task completes, it returns a value. For some reason, if the call() method cannot execute or fails, it throws an Exception.

To execute a task using the Callable object, you first create a thread pool. A thread pool is a collection of threads that can execute tasks. You create a thread pool using the Executors utility class. This class provides methods to get instances of thread pools, thread factories, etc.

The ExecutorService interface implements the Executor interface and provides services such as termination of threads and production of Future objects. Some tasks may take considerable execution time to complete. So, when you submit a task to the executor service, you get a Future object.

Future represents objects that contain a value that is returned by a thread in the future (i.e., it returns the value once the thread terminates in the “future”). You can use the isDone() method in the Future class to check if the task is complete and then use the get() method to fetch the task result. If you call the get() method directly while the task is not complete, the method blocks until it completes and returns the value once available.

import java.util.concurrent.Callable;

public class Factorial implements Callable<Long> {

long n;

public Factorial(long n) { 
this.n = n;        
}

public Long call() throws Exception {
if (n <= 0) {
throw new Exception("for finding factorial, N should be > 0");
}
long fact = 1;
for (long longVal = 1; longVal <= n; longVal++) {
fact *= longVal;
}
return fact;
}

}


import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableTest {

public static void main(String []args) throws Exception {             
// the value for which we want to find the factorial                
long N = 4;                
// get a callable task to be submitted to the executor service                
Callable<Long> task = new Factorial(N);                
// create an ExecutorService with a fixed thread pool consisting of one thread                
ExecutorService es = Executors.newSingleThreadExecutor();                
// submit the task to the executor service and store the Future object                
Future<Long> future = es.submit(task);                
// wait for the get() method that blocks until the computation is complete.                
System.out.printf("factorial of %d is %d", N, future.get());                
// done. shutdown the executor service since we don't need it anymore                
if(!es.isShutdown())
es.shutdown();  
System.out.println(" and Sevice is shutdown");
}


In this program, you have a Factorial class that implements Callable. Since the task is to compute the factorial of a number N, the task needs to return a result. You use Long type for the factorial value, so you implement Callable<Long>. Inside the Factorial class, you define the call() method that actually performs the task (the task
here is to compute the factorial of the given number). If the given value N is negative or zero, you don’t perform the task and throw an exception to the caller. Otherwise, you loop from 1 to N and find the factorial value.

In the CallableTest class, you first create an instance of the Factorial class. You then need to execute this task. 

For the sake of simplicity, you get a singled-threaded executor by calling the newSingleThreadExecutor() method in the Executors class. Note that you could use other methods such as newFixedThreadPool(nThreads) to create a thread pool with multiple threads depending on the level of parallelism you need.Once you get an ExecutorService, you submit the task for execution. ExecutorService abstracts details such as when the task is executed, how the task is assigned to the threads, etc. You get a reference to Future<Long> when you call the submit(task) method. From this future reference, you call the get() method to fetch the result after completing the task. If the task is still executing when you call future.get(), this get() method will block until the task execution completes. Once the execution is complete, you need to manually release the ExecutorService by calling the shutdown() method.Now that you are familiar with the basic mechanism of how to execute tasks, here’s a complex example. Assume that your task is to find the sum of numbers from 1 to N where N is a large number (a million in our case). Of course, you can use the formula [(N * (N + 1)) / 2] to find out the sum. Yes, you’ll make use of this formula to check if the summation from 1 . . . N is correct or not. However, just for illustration, you’ll divide the range 1 to 1 million to N sub-ranges and by spawn N threads to sum up numbers in that sub-range.

import java.util.*;
import java.util.concurrent.*;

//We create a class SumOfN that sums the values from 1..N where N is a large number.
//We divide the task // to sum the numbers to 10 threads (which is an arbitrary limit just for illustration).
//Once computation is complete, we add the results of all the threads,
//and check if the calculation is correct by using the formula (N * (N + 1))/2.
class SumOfN {
            private static long N = 1_000_000L;

            // one million
            private static long calculatedSum = 0;
           
            // value to hold the sum of values in range 1..N
            private static final int NUM_THREADS = 10;

            // number of threads to create for distributing the effort
            // This Callable object sums numbers in range from..to
static class SumCalc implements Callable<Long> {               
                                     long from, to, localSum = 0;
                                     
             public SumCalc(long from, long to) {                       
                                                this.from = from;                       
                                                this.to = to;               
                                                }
            
                                    public Long call() {                       
                                                // add in range 'from' .. 'to' inclusive of the value 'to'                       
                                                for(long i = from; i <= to; i++) {                               
                                                localSum += i;                       
                                                }                       
                                                return localSum;                
                                                }       
            }
//In the main method we implement the logic to divide the summation tasks to
//given number of threads and finally check if the calculated sum is correct
public static void main(String []args) {
//Divide the task among available fixed number of threads
ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS);
//store the references to the Future objects in a List for summing up together
List<Future<Long>> summationTasks = new ArrayList<>();
long nByTen = N/10; // divide N by 10 so that it can be submitted as 10 tasks
for(int i = 0; i < NUM_THREADS; i++) {
//create a summation task
//starting from (10 * 0) + 1 .. (N/10 * 1) to (10 * 9) + 1 .. (N/10 * 10)
long fromInInnerRange = (nByTen * i) + 1;
long toInInnerRange = nByTen * (i+1);
System.out.printf("Spawning thread for summing in range %d to %d %n",
fromInInnerRange, toInInnerRange);
//Create a callable object for the given summation range
Callable<Long> summationTask =
new SumCalc(fromInInnerRange, toInInnerRange);
//submit that task to the executor service
Future<Long> futureSum = executorService.submit(summationTask);
//it will take time to complete, so add it to the list to revisit later
summationTasks.add(futureSum);

}

executorService.shutdown();


//now, find the sum from each task
for(Future<Long> partialSum : summationTasks) {
try {
//the get() method will block (i.e., wait) until the computation is over
calculatedSum += partialSum.get();
} catch(CancellationException | ExecutionException
| InterruptedException exception) {
//unlikely that you get an exception - exit in case something goes wrong
exception.printStackTrace();
System.exit(-1);
}
}
//now calculate the sum using formula (N * (N + 1))/2 without doing the hard-work
long formulaSum = (N * (N + 1))/2;
//print the sum using formula and the ones calculated one by one
//they must be equal!
System.out.printf("Sum by threads = %d, sum using formula = %d",
calculatedSum, formulaSum);
}
}
/*
Spawning thread for summing in range 1 to 100000000
Spawning thread for summing in range 100000001 to 200000000
Spawning thread for summing in range 200000001 to 300000000
Spawning thread for summing in range 300000001 to 400000000
Spawning thread for summing in range 400000001 to 500000000
Spawning thread for summing in range 500000001 to 600000000
Spawning thread for summing in range 600000001 to 700000000
Spawning thread for summing in range 700000001 to 800000000
Spawning thread for summing in range 800000001 to 900000000
Spawning thread for summing in range 900000001 to 1000000000
Sum by threads = 500000000500000000, sum using formula = 500000000500000000

Let’s now analyze how this program works. In this program, you need to find the sum of 1..N where N is one
million (a large number). The class SumCalc implements Callable<Long> to sum the values in the range from to
to. The call() method performs the actual computation of the sum by looping from from to to and returns the
intermediate sum value as a Long value.
In this program, you divide the summation task among multiple threads. You can determine the number of
threads based on the number of cores available in your processor; however, for the sake of keeping the program
simpler, use ten threads.
In the main() method, you create a ThreadPool with ten threads. You are going to create ten summation tasks, so
you need a container to hold the references to those tasks. Use ArrayList to hold the Future<Long> references.
In the first for loop in main(), you create ten tasks and submit them to the ExecutorService. As you submit a
task, you get a Future<Long> reference and you add it to the ArrayList.
Once you’ve created the ten tasks, you traverse the array list in the next for loop to get the results of the tasks. You
sum up the partial results of the individual tasks to compute the final sum.
Once you get the computed sum of values from one to one million, you use the simple formula N * (N + 1)/2
to find the formula sum. From the output, you can see that the computed sum and the formula sum are equal, so you
can ascertain that your logic of dividing the tasks and combining the results of the tasks worked correctly.*/

ThreadFactory
ThreadFactory is an interface that is meant for creating threads instead of explicitly creating threads by calling new Thread(). For example, assume that you often create high-priority threads. You can create a MaxPriorityThreadFactory to set the default priority of threads created by that factory to maximum priority
This will use when we wanna set some thread priority high so we can use below example for standard way.




import java.util.concurrent.ThreadFactory;

public class MaxPriorityThreadFactory implements ThreadFactory {
            private static long count = 0;
            public Thread newThread(Runnable r) {
            Thread temp = new Thread(r);
            temp.setName("prioritythread" + count++);
            temp.setPriority(Thread.MAX_PRIORITY);
            return temp;
            }
            }

public class ARunnable implements Runnable {
            public void run() {
                        System.out.println("Running the created thread ");
                        }
                        }

public class TestThreadFactory {
            public static void main(String []args) {
                        ThreadFactory threadFactory = new MaxPriorityThreadFactory();
                        ThreadFactory threadFactory1 = new MaxPriorityThreadFactory();
                        Thread t1 = threadFactory.newThread(new ARunnable());
                        System.out.println("The name of the thread is " + t1.getName());
                        System.out.println("The priority of the thread is " + t1.getPriority());
                        t1.start();
                        }
                        }

The ThreadLocalRandom Class When you do concurrent programming, you’ll find that there is often a need to generate random numbers.Using Math.random() is not efficient for concurrent programming. For this reason, the java.util.concurrent package introduces the ThreadLocalRandom class, which is suitable for use in concurrent programs. You can use ThreadLocalRandom.current() and then call methods such as nextInt() and nextFloat() to generate the random numbers.

No comments:

Post a Comment