Friday 28 December 2018

concurrency in java tutorial-java multithreading tutorial

concurrency in java tutorial-java multithreading tutorial

Concurrency : It provide the below package in java to implement concurrency.


  • Use java.util.concurrent collections
  • Apply atomic variables and locks
  • Use Executors and ThreadPools
  • Use the parallel Fork/Join Framework

From the beginning, Java supported concurrency in the form of low-level threads management, locks, synchronization, and APIs for concurrency. We covered them in the preceding blog in our discussion of the Thread class, Runnable interface, and synchronized keyword.
Since 5.0, Java also supports high-level concurrency APIs in its java.util.concurrent package. In this blog, we’ll focus on these APIs for concurrent programming. These high-level APIs exploit today’s multi-core hardware, in which a single processor has multiple cores. These APIs are also useful for exploiting concurrency in machines that support multiple processors.
Most of the Java concurrency utilities are provided in the java.util.concurrent package. Classes to efficiently update shared variables without using locks are provided in the java.util.concurrent.atomic subpackage. The Lock interface and the classes deriving from it are provided in the java.util.concurrent.locks subpackage.

Using java.util.concurrent Collections

There are many classes in the java.util.concurrent package that provide high-level APIs for concurrent programming. In this section, we will mainly discuss synchronizer classes provided in this package. Following that, we will briefly cover the important concurrent collection classes provided in the java.util.concurrent package.You already understand the low-level concurrency constructs (such as the use of the synchronized keyword, Runnable interface, and Thread class for creating threads) from the preceding chapter. In the case of a shared resource that needs to be accessed by multiple threads, access and modifications to the shared resource need to be protected.
When you use the synchronized keyword, you employ mutexes to synchronize between threads for safe shared access. Threads also often needed to coordinate their executions to complete a bigger higher-level task. The wait/notify pattern discussed in the last chapter is one way to coordinate the execution of multiple threads.Using APIs for acquiring and releasing locks (using mutexes) or invoking the wait/notify methods on locks are low-level tasks. It is possible to build higher-level abstractions for thread synchronization. These high-level abstractions for synchronizing activities of two or more threads are known as synchronizers. Synchronizers internally make use of the existing low-level APIs for thread coordination.

The synchronizers provided in the java.util.concurrent library and their uses are listed here:

• Semaphore controls access to one or more shared resources.
• Phaser is used to support a synchronization barrier.
• CountDownLatch allows threads to wait for a countdown to complete.
• Exchanger supports exchanging data between two threads.
• CyclicBarrier enables threads to wait at a predefined execution point.

Now, we’ll discuss each of these synchronizers in turn with the help of examples.

Semaphore
A semaphore controls access to shared resources. A semaphore maintains a counter to specify the number of resources that the semaphore controls. Access to the resource is allowed if the counter is greater than zero,while a zero value of the counter indicates that no resource is available at the moment and so the access is denied.
The methods acquire() and release() are for acquiring and releasing resources from a semaphore. If a thread calls acquire() and the counter is zero (i.e., resources are unavailable), the thread waits until the counter is non-zero and then gets the resource for use. Once the thread is done using the resource, it calls release() to increment the resource availability counter.
Note if the number of resources is 1, then at a given time only one thread can access the resource; in this case,using the semaphore is similar to using a lock.

Semaphore(int permits)
 Constructor to create Semaphore objects with a given number of permits
(the number of threads that can access the resource at a time). If the
permit’s value is negative, the given number of release() calls must

happen before acquire() calls can succeed.

Semaphore(int permits,boolean fair)
Same as the previous constructor, but this extra fair option indicates that

the permits should be allotted on a first-come-first-served basis.

void acquire()
void acquire(int permits)
Acquires a permit if available; otherwise, it blocks until a permit becomes
available. Can throw an InterruptedException if some other thread
interrupts it while waiting to acquire a permit. The overloaded version
takes a number of permits as an argument.

void acquireUninterruptibly()
Same as the acquire() method, but this thread cannot be interrupted
while waiting to acquire a permit.

boolean tryAcquire()
boolean tryAcquire(long timeout,TimeUnit unit)
Acquires a permit from the semaphore if available at the time of the call
and returns true; if unavailable, it returns false immediately (without
blocking). The overloaded tryAcquire() method additionally takes a
time-out argument—the thread blocks to acquire a permit from the
semaphore until a given time-out period.

void release()
void release(int permits)
Releases a permit from the semaphore. The overloaded version specifies
the number of permits to release.

java multithreading programs

Let’s assume that there are two ATM machines available in a ATM machine room. Therefore, only two people are
allowed at a time in the room. There are five people waiting outside to use the ATM machines. The situation can be
simulated by the code in which each ATM machine is treated as a resource controlled by semaphore.

import java.util.concurrent.Semaphore;
// This class simulates a situation where an ATM room has only two ATM machines
// and five people are waiting to access the machine. Since only one person can access
// an ATM machine at a given time, others wait for their turn
class ATMRoom {
public static void main(String []args) {
// assume that only two ATM machines are available in the ATM room
Semaphore machines = new Semaphore(2);
// list of people waiting to access the machine
new Person(machines, "Mickey");
new Person(machines, "Donald");
new Person(machines, "Tom");
new Person(machines, "Jerry");
new Person(machines, "Casper");
}
}
// Each Person is an independent thread; but their access to the common resource
// (two ATM machines in the ATM machine room in this case) needs to be synchronized.
class Person extends Thread {
private Semaphore machines;
public Person(Semaphore machines, String name) {
this.machines = machines;
this.setName(name);
this.start();
}
public void run() {
try {
System.out.println(getName() + " waiting to access an ATM machine");
machines.acquire();
System.out.println(getName() + " is accessing an ATM machine");
Thread.sleep(1000); // simulate the time required for withdrawing amount
System.out.println(getName() + " is done using the ATM machine");
machines.release();
} catch(InterruptedException ie) {
System.err.println(ie);
}
}
}

Here is the output of the program in one sample run:
Mickey waiting to access an ATM machine
Tom waiting to access an ATM machine
Jerry waiting to access an ATM machine
Donald waiting to access an ATM machine
Casper waiting to access an ATM machine
Tom is accessing an ATM machine
Mickey is accessing an ATM machine
Tom is done using the ATM machine
Mickey is done using the ATM machine
Jerry is accessing an ATM machine
Donald is accessing an ATM machine
Donald is done using the ATM machine
Jerry is done using the ATM machine
Casper is accessing an ATM machine
Casper is done using the ATM machine

Now let’s analyze how this program works. People waiting to access an ATM machine are simulated by creating a
Person class that extends Thread. The run() method in the Thread class acquires a semaphore, simulates withdrawing
money from the ATM machine, and releases the semaphore.
The main() method simulates an ATM room with two ATM machines by creating a Semaphore object with
two permits. People waiting in the queue to access the ATM machine are implemented by just adding them to the
Semaphore object.
As you can see from the program output, the semaphore allows only two threads at a time and the other threads
keep waiting. When a thread releases the semaphore, another thread acquires it. Cool, isn’t it?

CountDownLatch
This synchronizer allows one or more threads to wait for a countdown to complete. This countdown could be for a set of events to happen or until a set of operations being performed in other threads completes.  lists important methods in this class.

CountDownLatch(int count) Creates an instance of CountDownLatch with the number of times the
countDown() method must be called before the threads waiting with await()
can continue execution.

void await() If the current count in CountDownLatch object is zero, it immediately returns;
otherwise, the thread blocks until the countdown reaches zero. Can throw an
InterruptedException.

boolean await(long timeout,TimeUnit unit)
Same as the previous method, await(), but takes an additional time-out
argument. If the thread returns successfully after the count reaches zero, this
method returns true; if the thread returns because of time-out, it returns false.

void countDown() Reduces the number of counts by one in this CountDownLatch object. If the
count reaches zero, all the (a)waiting threads are released. If the current count
is already zero, nothing happens.

long getCount() Returns the pending counts in this CountDownLatch object.

When you create a CountDownLatch, you initialize it with an integer, which represents a count value. Threads would wait (by calling the await() method) for this count to reach zero. Once zero is reached, all threads are released; any other calls to await() would return immediately since the count is already zero. The counter value can be decremented by one by calling the countDown() method. You can get the current value of the counter using the getCount() method.

import java.util.concurrent.*;
// this class simulates the start of a running race by counting down from 5. It holds
// three runner threads to be ready to start in the start line of the race and once the count down
// reaches zero, all the three runners start running...
class RunningRaceStarter {
public static void main(String []args) throws InterruptedException {
CountDownLatch counter = new CountDownLatch(5);
// count from 5 to 0 and then start the race
// instantiate three runner threads
new Runner(counter, "Carl");
new Runner(counter, "Joe");
new Runner(counter, "Jack");
System.out.println("Starting the countdown ");
long countVal = counter.getCount();
while(countVal > 0) {
Thread.sleep(1000); // 1000 milliseconds = 1 second
System.out.println(countVal);
if(countVal == 1) {
// once counter.countDown(); in the next statement is called,
// Count down will reach zero; so shout "Start"
System.out.println("Start");
}
counter.countDown(); // count down by 1 for each second
countVal = counter.getCount();
}
}
}
// this Runner class simulates a track runner in a 100-meter dash race. The runner waits until the
// count down timer gets to zero and then starts running
class Runner extends Thread {
private CountDownLatch timer;
public Runner(CountDownLatch cdl, String name) {
timer = cdl;
this.setName(name);
System.out.println(this.getName() + " ready and waiting for the count down to start");
start();
}
public void run() {
try {
// wait for the timer count down to reach 0
timer.await();
} catch (InterruptedException ie) {
System.err.println("interrupted -- can't start running the race");
}
System.out.println(this.getName() + " started running");
}
}

This program prints the following:
Carl ready and waiting for the count down to start
Joe ready and waiting for the count down to start
Jack ready and waiting for the count down to start
Starting the countdown
5
4
3
2
1
Start
Joe started running
Carl started running
Jack started running

Let’s consider how the program works. The class Runner simulates a runner in a running race waiting to start running.
It waits for the race to start by calling the await() method on the CountDownLatch object passed through the constructor.
The RunningRaceStarter class creates a CountDownLatch object. This counter object is initialized with the count value 5, which means the countdown is from 5 to 0. In the main() method, you create Runner objects; these three threads wait on the counter object. For each second, you call the countDown() method, which decrements count by 1.
Once the count reaches zero, all three waiting threads are released and they automatically continue execution.
Note: In this program, the sequence in which Joe, Carl, or Jack is printed cannot be predicted since it depends on

thread scheduling. So, if you run this program, you may get these three names printed in some other order.

Exchanger
The Exchanger class is meant for exchanging data between two threads. What Exchanger does is something very
simple: it waits until both the threads have called the exchange() method. When both threads have called the exchange() method, the Exchanger object actually exchanges the data shared by the threads with each other. This class is useful when two threads need to synchronize between them and continuously exchange data.
This class is a tiny class with only one method: exchange().
Note that this exchange() method has an overloaded form where it takes a time-out period as an argument. shows an example simulating silly talk between the Java Duke mascot and the coffee shop. The two threads DukeThread and CoffeeShop threads run independently. However, for a chat to happen, they need to listen when the other is talking. An Exchange object provides a means for them to talk to each other.

import java.util.concurrent.Exchanger;
// The DukeThread class runs as an independent thread. It talks to the CoffeeShopThread that
// also runs independently. The chat is achieved by exchanging messages through a common
// Exchanger<String> object that synchronizes the chat between them.
// Note that the message printed are the "responses" received from CoffeeShopThread
class DukeThread extends Thread {
private Exchanger<String> sillyTalk;
public DukeThread(Exchanger<String> args) {
sillyTalk = args;
}
public void run() {
String reply = null;
try {
// start the conversation with CoffeeShopThread
reply = sillyTalk.exchange("Knock knock!");
// Now, print the response received from CoffeeShopThread
System.out.println("CoffeeShop: " + reply);
// exchange another set of messages
reply = sillyTalk.exchange("Duke");
// Now, print the response received from CoffeeShopThread
System.out.println("CoffeeShop: " + reply);
// an exchange could happen only when both send and receive happens
// since this is the last sentence to speak, we close the chat by
// ignoring the "dummy" reply
reply = sillyTalk.exchange("The one who was born in this coffee shop!");
// talk over, so ignore the reply!
} catch(InterruptedException ie) {
System.err.println("Got interrupted during my silly talk");
}
}
}
class CoffeeShopThread extends Thread {
private Exchanger<String> sillyTalk;
public CoffeeShopThread(Exchanger<String> args) {
sillyTalk = args;
}
public void run() {
String reply = null;
try {
// exchange the first messages
reply = sillyTalk.exchange("Who's there?");
// print what Duke said
System.out.println("Duke: " + reply);
// exchange second message
reply = sillyTalk.exchange("Duke who?");
// print what Duke said
System.out.println("Duke: " + reply);
// there is no message to send, but to get a message from Duke thread,
// both ends should send a message; so send a "dummy" string
reply = sillyTalk.exchange("");
System.out.println("Duke: " + reply);
} catch(InterruptedException ie) {
System.err.println("Got interrupted during my silly talk");
}
}
}
// Coordinate the silly talk between Duke and CoffeeShop by instantitaing the Exchanger object
// and the CoffeeShop and Duke threads
class KnockKnock {
public static void main(String []args) {
Exchanger<String> sillyTalk = new Exchanger<String>();
new CoffeeShopThread(sillyTalk).start();
new DukeThread(sillyTalk).start();
}
}
The program prints the following:
Duke: Knock knock!
CoffeeShop: Who's there?
Duke: Duke
CoffeeShop: Duke who?
Duke: The one who was born in this coffee shop!

The comments inside the program explain how the program works. The main concept to understand with this example is that Exchanger helps coordinate (i.e., synchronize) exchanging messages between two threads. Both the threads wait for each other and use the exchange() method to exchange messages.


CyclicBarrier
There are many situations in concurrent programming where threads may need to wait at a predefined execution
point until all other threads reach that point. CyclicBarrier helps provide such a synchronization point;

CyclicBarrier(int numThreads) Creates a CyclicBarrier object with the number of threads waiting
on it specified. Throws IllegalArgumentException if numThreads is
negative or zero.

CyclicBarrier(int parties, Runnable barrierAction)
Same as the previous constructor; this constructor additionally takes
the thread to call when the barrier is reached.

int await()
int await(long timeout,TimeUnit unit)
Blocks until the specified number of threads have called await()
on this barrier. The method returns the arrival index of this
thread. This method can throw an InterruptedException if
the thread is interrupted while waiting for other threads or a
BrokenBarrierException if the barrier was broken for some reason
(for example, another thread was timed-out or interrupted). The
overloaded method takes a time-out period as an additional option;
this overloaded version throws a TimeoutException if all other
threads aren’t reached within the time-out period.

boolean isBroken() Returns true if the barrier is broken. A barrier is broken if at least one
thread in that barrier was interrupted or timed-out, or if a barrier
action failed throwing an exception.

void reset() Resets the barrier to the initial state. If there are any threads waiting
on that barrier, they will throw the BrokenBarrier exception.

import java.util.concurrent.*;
// The run() method in this thread should be called only when four players are ready to start the game
class MixedDoubleTennisGame extends Thread {
public void run() {
System.out.println("All four players ready, game starts \n Love all...");
}
}
// This thread simulates arrival of a player.
// Once a player arrives, he/she should wait for other players to arrive
class Player extends Thread {
CyclicBarrier waitPoint;
public Player(CyclicBarrier barrier, String name) {
this.setName(name);
waitPoint = barrier;
this.start();
}
public void run() {
System.out.println("Player " + getName() + " is ready ");
try {
waitPoint.await(); // await for all four players to arrive
} catch(BrokenBarrierException | InterruptedException exception) {
System.out.println("An exception occurred while waiting... " + exception);
}
}
}
// Creates a CyclicBarrier object by passing the number of threads and the thread to run
// when all the threads reach the barrier
class CyclicBarrierTest {
public static void main(String []args) {
// a mixed-double tennis game requires four players; so wait for four players
// (i.e., four threads) to join to start the game
System.out.println("Reserving tennis court \n As soon as four players arrive,
game will start");
CyclicBarrier barrier = new CyclicBarrier(4, new MixedDoubleTennisGame());
new Player(barrier, "G I Joe");
new Player(barrier, "Dora");
new Player(barrier, "Tintin");
new Player(barrier, "Barbie");
}
}
The program prints the following:
Reserving tennis court
As soon as four players arrive, game will start
Player G I Joe is ready
Player Dora is ready
Player Tintin is ready
Player Barbie is ready
All four players ready, game starts
Love all...
Now let’s see how this program works. In the main() method you create a CyclicBarrier object. The constructor takes two arguments: the number of threads to wait for, and the thread to invoke when all the threads reach the barrier. In this case, you have four players to wait for, so you create four threads, with each thread representing a player. The second argument for the CyclicBarrier constructor is the MixedDoubleTennisGame object since this thread represents the game, which will start once all four players are ready.
Inside the run() method for each Player thread, you call the await() method on the CyclicBarrier
object. Once the number of awaiting threads for the CyclicBarrier object reaches four, the run() method in MixedDoubleTennisGame is called.

Phaser
Phaser is a useful feature when few independent threads have to work in phases to complete a task. So, a
synchronization point is needed for the threads to work on a part of a task, wait for others to complete other part of
the task, and do a sync-up before advancing to complete the next part of the task.

List of methods :

Phaser()
 Creates a Phaser object with no registered parties and no parents. The
initial phase is set to 0.
Phaser(int numThreads) Creates a Phaser object with a given number of threads (parties) to arrive
to advance to the next stage; the initial phase is set to 0.
int register() Adds a new thread (party) to this Phaser object. Returns the phase
current number. Throws an IllegalStateException if the maximum
supported parties are already registered.
int bulkRegister(int numThreads) Adds numThreads of unarrived parties to this Phaser object. Returns the phase current number. Throws an IllegalStateException if maximum
supported parties are already registered.
int arrive() Arrives at this phase without waiting for other threads to arrive. Returns
the arrival phase number. Can throw an IllegalStateException.
int arriveAndDeregister() Same as the previous method, but also deregisters from the Phaser object.
int arriveAndAwaitAdvance() Arrive at this phase and waits (i.e., blocks) until other threads arrive.
int awaitAdvance(int phase) Waits (i.e., blocks) until this Phaser object advances to the given
phase value.
int getRegisteredParties() Returns the number of threads (parties) registered with this Phaser object.
int getArrivedParties() Returns the number of threads (parties) arrived at the current phase of
the Phaser object.
int getUnarrivedParties() Returns the number of threads (parties) that have not arrived when
compared to the registered parties at the current phase of the Phaser object.


Consider the example of processing a delivery order in a small coffee shop. Assume that there are only three workers: a cook, a helper, and an attendant. To simplify the program logic, assume that each delivery order consists of three food items. Completing a delivery order consists of preparing the three orders one after another. To complete preparing a food item, all three workers—the cook, the helper, and the attendant—should do their part of the work. shows how this situation can be implemented using the Phaser class.

import java.util.concurrent.*;
// ProcessOrder thread is the master thread overlooking to make sure that the Cook, Helper,
// and Attendant are doing their part of the work to complete preparing the food items
// and complete order delivery
// To simplify the logic, we assume that each delivery order consists of exactly three food items
class ProcessOrder {
public static void main(String []args) throws InterruptedException {
// the Phaser is the synchronizer to make food items one-by-one,
// and deliver it before moving to the next item
Phaser deliveryOrder = new Phaser(1);
System.out.println("Starting to process the delivery order ");
new Worker(deliveryOrder, "Cook");
new Worker(deliveryOrder, "Helper");
new Worker(deliveryOrder, "Attendant");
for(int i = 1; i <= 3; i++) {
// Prepare, mix and deliver this food item
deliveryOrder.arriveAndAwaitAdvance();
System.out.println("Deliver food item no. " + i);
}
// work completed for this delivery order, so deregister
deliveryOrder.arriveAndDeregister();
System.out.println("Delivery order completed... give it to the customer");
}
}
// The work could be a Cook, Helper, or Attendant. Though the three work independently, the
// should all synchronize their work together to do their part and complete preparing a food item
class Worker extends Thread {
Phaser deliveryOrder;
Worker(Phaser order, String name) {
deliveryOrder = order;
this.setName(name);
deliveryOrder.register();
start();
}
public void run() {
for(int i = 1; i <= 3; i++) {
System.out.println("\t" + getName() + " doing his work for order no. " + i);
if(i == 3) {
// work completed for this delivery order, so deregister
deliveryOrder.arriveAndDeregister();
} else {
deliveryOrder.arriveAndAwaitAdvance();
}
try {
Thread.sleep(3000); // simulate time for preparing the food item
} catch(InterruptedException ie) {
/* ignore exception */
ie.printStackTrace();
}
}
}
}
The program prints the following:
Starting to process the delivery order
Cook doing his work for order no. 1
Attendant doing his work for order no. 1
Helper doing his work for order no. 1
Deliver food item no. 1
Helper doing his work for order no. 2
Attendant doing his work for order no. 2
Cook doing his work for order no. 2
Deliver food item no. 2
Helper doing his work for order no. 3
Cook doing his work for order no. 3
Attendant doing his work for order no. 3
Deliver food item no. 3
Delivery order completed . . . give it to the customer

In this program, you create a Phaser object to support the synchronizing of three Worker thread objects. You create a Phaser object by calling the default constructor of the Phaser object. When the Worker thread objects are created, they register themselves to the Phaser object. Alternatively, you could have called Phaser deliveryOrder = new Phaser(3); // for three parties (i.e., threads)
In this case, you would not need to call the register() method on the Phaser object in the Worker thread constructor.
In this case, you’ve assumed that a delivery order consists of processing three food items, so the for loop runs three times. For each iteration, you call deliveryOrder.arriveAndAwaitAdvance(). For this statement to proceed, all the three parties (the Cook, Helper, and Attendant) have to complete their part of the work to prepare the food item. You simulate “preparing food” by calling the sleep() method in the run method for these Worker threads. These worker threads call deliveryOrder.arriveAndAwaitAdvance() for preparing each food item. As each food item is
prepared (i.e., each phase is completed), the work progresses to the next phase. Once three phases are complete, the delivery order processing is complete and the program returns.

Concurrent Collections
The java.util.concurrent package provides a number of classes that are thread-safe equivalents of the ones
provided in the collections framework classes in the java.util package


No comments:

Post a Comment