concurrency in java tutorial-java multithreading tutorial
Concurrency : It
provide the below package in java to implement concurrency.
- Use
java.util.concurrent collections
- Apply
atomic variables and locks
- Use
Executors and ThreadPools
- Use
the parallel Fork/Join Framework
From the beginning, Java supported concurrency in the form of
low-level threads management, locks, synchronization, and APIs for concurrency.
We covered them in the preceding blog in our discussion of the Thread class,
Runnable interface, and synchronized keyword.
Since 5.0, Java also supports high-level concurrency APIs in its
java.util.concurrent package. In this blog, we’ll focus on these APIs for
concurrent programming. These high-level APIs exploit today’s multi-core
hardware, in which a single processor has multiple cores. These APIs are also
useful for exploiting concurrency in machines that support multiple processors.
Most of the Java concurrency utilities are provided in the
java.util.concurrent package. Classes to efficiently update shared variables
without using locks are provided in the java.util.concurrent.atomic subpackage.
The Lock interface and the classes deriving from it are provided in the
java.util.concurrent.locks subpackage.
Using java.util.concurrent Collections
There are many classes in the java.util.concurrent
package that provide high-level APIs for concurrent programming. In this
section, we will mainly discuss synchronizer classes provided in this package.
Following that, we will briefly cover the important concurrent collection classes
provided in the java.util.concurrent package.You already understand the
low-level concurrency constructs (such as the use of the synchronized keyword,
Runnable interface, and Thread class for creating threads) from the preceding
chapter. In the case of a shared resource that needs to be accessed by multiple
threads, access and modifications to the shared resource need to be protected.
When you use the synchronized keyword, you employ mutexes to
synchronize between threads for safe shared access. Threads also often needed
to coordinate their executions to complete a bigger higher-level task. The
wait/notify pattern discussed in the last chapter is one way to coordinate the
execution of multiple threads.Using APIs for acquiring and releasing locks
(using mutexes) or invoking the wait/notify methods on locks are low-level
tasks. It is possible to build higher-level abstractions for thread
synchronization. These high-level abstractions for synchronizing activities of
two or more threads are known as synchronizers. Synchronizers internally make
use of the existing low-level APIs for thread coordination.
The synchronizers provided in the java.util.concurrent library and
their uses are listed here:
• Semaphore controls access to one or more shared resources.
• Phaser is used to support a synchronization barrier.
• CountDownLatch allows threads to wait for a countdown to
complete.
• Exchanger supports exchanging data between two threads.
• CyclicBarrier enables threads to wait at a predefined execution
point.
Now, we’ll discuss each of these synchronizers in turn with the
help of examples.
Semaphore
A semaphore controls access to shared resources. A semaphore
maintains a counter to specify the number of resources that the semaphore
controls. Access to the resource is allowed if the counter is greater than
zero,while a zero value of the counter indicates that no resource is available
at the moment and so the access is denied.
The methods acquire() and release() are for acquiring and
releasing resources from a semaphore. If a thread calls acquire() and the
counter is zero (i.e., resources are unavailable), the thread waits until the
counter is non-zero and then gets the resource for use. Once the thread is done
using the resource, it calls release() to increment the resource availability
counter.
Note if the number of resources is 1, then at a given time only
one thread can access the resource; in this case,using the semaphore is similar
to using a lock.
Semaphore(int permits) Constructor to create
Semaphore objects with a given number of permits
(the number of threads that can access the resource at a time). If
the
permit’s value is negative, the given number of release() calls
must
happen before acquire() calls can succeed.
Semaphore(int permits,boolean fair)
Same as the previous constructor, but this extra fair option
indicates that
the permits should be allotted on a first-come-first-served basis.
void acquire()
void acquire(int permits)
Acquires a permit if available; otherwise, it blocks until a
permit becomes
available. Can throw an InterruptedException if some other thread
interrupts it while waiting to acquire a permit. The overloaded
version
takes a number of permits as an argument.
void acquireUninterruptibly()
Same as the acquire() method, but this thread cannot be
interrupted
while waiting to acquire a permit.
boolean tryAcquire()
boolean tryAcquire(long timeout,TimeUnit unit)
Acquires a permit from the semaphore if available at the time of
the call
and returns true; if unavailable, it returns false immediately
(without
blocking). The overloaded tryAcquire() method additionally takes a
time-out argument—the thread blocks to acquire a permit from the
semaphore until a given time-out period.
void release()
void release(int permits)
Releases a permit from the semaphore. The overloaded version
specifies
the number of permits to release.
java multithreading programs
Let’s assume that there are two ATM machines available in a ATM
machine room. Therefore, only two people are
allowed at a time in the room. There are five people waiting
outside to use the ATM machines. The situation can be
simulated by the code in which each ATM machine is treated as a
resource controlled by semaphore.
import java.util.concurrent.Semaphore;
// This class simulates a situation where an ATM room has only two
ATM machines
// and five people are waiting to access the machine. Since only
one person can access
// an ATM machine at a given time, others wait for their turn
class ATMRoom {
public static void main(String []args) {
// assume that only two ATM machines are available in the ATM room
Semaphore machines = new Semaphore(2);
// list of people waiting to access the machine
new Person(machines, "Mickey");
new Person(machines, "Donald");
new Person(machines, "Tom");
new Person(machines, "Jerry");
new Person(machines, "Casper");
}
}
// Each Person is an independent thread; but their access to the
common resource
// (two ATM machines in the ATM machine room in this case) needs
to be synchronized.
class Person extends Thread {
private Semaphore machines;
public Person(Semaphore machines, String name) {
this.machines = machines;
this.setName(name);
this.start();
}
public void run() {
try {
System.out.println(getName() + " waiting to access an ATM
machine");
machines.acquire();
System.out.println(getName() + " is accessing an ATM
machine");
Thread.sleep(1000); // simulate the time required for withdrawing
amount
System.out.println(getName() + " is done using the ATM
machine");
machines.release();
} catch(InterruptedException ie) {
System.err.println(ie);
}
}
}
Here is the output of the program in one sample run:
Mickey waiting to access an ATM machine
Tom waiting to access an ATM machine
Jerry waiting to access an ATM machine
Donald waiting to access an ATM machine
Casper waiting to access an ATM machine
Tom is accessing an ATM machine
Mickey is accessing an ATM machine
Tom is done using the ATM machine
Mickey is done using the ATM machine
Jerry is accessing an ATM machine
Donald is accessing an ATM machine
Donald is done using the ATM machine
Jerry is done using the ATM machine
Casper is accessing an ATM machine
Casper is done using the ATM machine
Now let’s analyze how this program works. People waiting to access
an ATM machine are simulated by creating a
Person class that extends Thread. The run() method in the Thread
class acquires a semaphore, simulates withdrawing
money from the ATM machine, and releases the semaphore.
The main() method simulates an ATM room with two ATM machines by
creating a Semaphore object with
two permits. People waiting in the queue to access the ATM machine
are implemented by just adding them to the
Semaphore object.
As you can see from the program output, the semaphore allows only
two threads at a time and the other threads
keep waiting. When a thread releases the semaphore, another thread
acquires it. Cool, isn’t it?
CountDownLatch
This synchronizer allows one or more threads to wait for a
countdown to complete. This countdown could be for a set of events to happen or
until a set of operations being performed in other threads completes.
lists important methods in this class.
CountDownLatch(int count) Creates an instance of CountDownLatch with the number of
times the
countDown() method must be called before the threads waiting with
await()
can continue execution.
void await() If the current count in
CountDownLatch object is zero, it immediately returns;
otherwise, the thread blocks until the countdown reaches zero. Can
throw an
InterruptedException.
boolean await(long timeout,TimeUnit unit)
Same as the previous method, await(), but takes an additional
time-out
argument. If the thread returns successfully after the count
reaches zero, this
method returns true; if the thread returns because of time-out, it
returns false.
void countDown() Reduces
the number of counts by one in this CountDownLatch object. If the
count reaches zero, all the (a)waiting threads are released. If
the current count
is already zero, nothing happens.
long getCount() Returns the pending counts
in this CountDownLatch object.
When you create a CountDownLatch, you initialize it with an
integer, which represents a count value. Threads would wait (by calling the
await() method) for this count to reach zero. Once zero is reached, all threads
are released; any other calls to await() would return immediately since the
count is already zero. The counter value can be decremented by one by calling
the countDown() method. You can get the current value of the counter using the
getCount() method.
import java.util.concurrent.*;
// this class simulates the start of a running race by counting
down from 5. It holds
// three runner threads to be ready to start in the start line of
the race and once the count down
// reaches zero, all the three runners start running...
class RunningRaceStarter {
public static void main(String []args) throws InterruptedException
{
CountDownLatch counter = new CountDownLatch(5);
// count from 5 to 0 and then start the race
// instantiate three runner threads
new Runner(counter, "Carl");
new Runner(counter, "Joe");
new Runner(counter, "Jack");
System.out.println("Starting the countdown ");
long countVal = counter.getCount();
while(countVal > 0) {
Thread.sleep(1000); // 1000 milliseconds = 1 second
System.out.println(countVal);
if(countVal == 1) {
// once counter.countDown(); in the next statement is called,
// Count down will reach zero; so shout "Start"
System.out.println("Start");
}
counter.countDown(); // count down by 1 for each second
countVal = counter.getCount();
}
}
}
// this Runner class simulates a track runner in a 100-meter dash
race. The runner waits until the
// count down timer gets to zero and then starts running
class Runner extends Thread {
private CountDownLatch timer;
public Runner(CountDownLatch cdl, String name) {
timer = cdl;
this.setName(name);
System.out.println(this.getName() + " ready and waiting for
the count down to start");
start();
}
public void run() {
try {
// wait for the timer count down to reach 0
timer.await();
} catch (InterruptedException ie) {
System.err.println("interrupted -- can't start running the
race");
}
System.out.println(this.getName() + " started running");
}
}
This program prints the following:
Carl ready and waiting for the count down to start
Joe ready and waiting for the count down to start
Jack ready and waiting for the count down to start
Starting the countdown
5
4
3
2
1
Start
Joe started running
Carl started running
Jack started running
Let’s consider how the program works. The class Runner
simulates a runner in a running race waiting to start running.
It waits for the race to start by calling the await() method on
the CountDownLatch object passed through the constructor.
The RunningRaceStarter class creates a CountDownLatch object. This
counter object is initialized with the count value 5, which means the countdown
is from 5 to 0. In the main() method, you create Runner objects; these three
threads wait on the counter object. For each second, you call the countDown()
method, which decrements count by 1.
Once the count reaches zero, all three waiting threads are
released and they automatically continue execution.
Note: In this program, the sequence in
which Joe, Carl, or Jack is printed cannot be predicted since it depends on
thread scheduling. So, if you run this program, you may get these
three names printed in some other order.
Exchanger
The Exchanger class is meant for exchanging data between two
threads. What Exchanger does is something very
simple: it waits until both the threads have called the exchange()
method. When both threads have called the exchange() method, the Exchanger
object actually exchanges the data shared by the threads with each other. This
class is useful when two threads need to synchronize between them and
continuously exchange data.
This class is a tiny class with only one method: exchange().
Note that this exchange() method has an overloaded form where it
takes a time-out period as an argument. shows an example simulating silly talk
between the Java Duke mascot and the coffee shop. The two threads DukeThread
and CoffeeShop threads run independently. However, for a chat to happen, they
need to listen when the other is talking. An Exchange object provides a means
for them to talk to each other.
import java.util.concurrent.Exchanger;
// The DukeThread class runs as an independent thread. It talks to
the CoffeeShopThread that
// also runs independently. The chat is achieved by exchanging
messages through a common
// Exchanger<String> object that synchronizes the chat
between them.
// Note that the message printed are the "responses"
received from CoffeeShopThread
class DukeThread extends Thread {
private Exchanger<String> sillyTalk;
public DukeThread(Exchanger<String> args) {
sillyTalk = args;
}
public void run() {
String reply = null;
try {
// start the conversation with CoffeeShopThread
reply = sillyTalk.exchange("Knock knock!");
// Now, print the response received from CoffeeShopThread
System.out.println("CoffeeShop: " + reply);
// exchange another set of messages
reply = sillyTalk.exchange("Duke");
// Now, print the response received from CoffeeShopThread
System.out.println("CoffeeShop: " + reply);
// an exchange could happen only when both send and receive
happens
// since this is the last sentence to speak, we close the chat by
// ignoring the "dummy" reply
reply = sillyTalk.exchange("The one who was born in this
coffee shop!");
// talk over, so ignore the reply!
} catch(InterruptedException ie) {
System.err.println("Got interrupted during my silly
talk");
}
}
}
class CoffeeShopThread extends Thread {
private Exchanger<String> sillyTalk;
public CoffeeShopThread(Exchanger<String> args) {
sillyTalk = args;
}
public void run() {
String reply = null;
try {
// exchange the first messages
reply = sillyTalk.exchange("Who's there?");
// print what Duke said
System.out.println("Duke: " + reply);
// exchange second message
reply = sillyTalk.exchange("Duke who?");
// print what Duke said
System.out.println("Duke: " + reply);
// there is no message to send, but to get a message from Duke
thread,
// both ends should send a message; so send a "dummy"
string
reply = sillyTalk.exchange("");
System.out.println("Duke: " + reply);
} catch(InterruptedException ie) {
System.err.println("Got interrupted during my silly
talk");
}
}
}
// Coordinate the silly talk between Duke and CoffeeShop by
instantitaing the Exchanger object
// and the CoffeeShop and Duke threads
class KnockKnock {
public static void main(String []args) {
Exchanger<String> sillyTalk = new Exchanger<String>();
new CoffeeShopThread(sillyTalk).start();
new DukeThread(sillyTalk).start();
}
}
The program prints the following:
Duke: Knock knock!
CoffeeShop: Who's there?
Duke: Duke
CoffeeShop: Duke who?
Duke: The one who was born in this coffee shop!
The comments inside the program explain how the
program works. The main concept to understand with this example is that
Exchanger helps coordinate (i.e., synchronize) exchanging messages between two
threads. Both the threads wait for each other and use the exchange() method to
exchange messages.
CyclicBarrier
There are many situations in concurrent programming where threads
may need to wait at a predefined execution
point until all other threads reach that point. CyclicBarrier
helps provide such a synchronization point;
CyclicBarrier(int numThreads) Creates a CyclicBarrier object with the number of threads
waiting
on it specified. Throws IllegalArgumentException if numThreads is
negative or zero.
CyclicBarrier(int parties, Runnable barrierAction)
Same as the previous constructor; this constructor additionally
takes
the thread to call when the barrier is reached.
int await()
int await(long timeout,TimeUnit unit)
Blocks until the specified number of threads have called await()
on this barrier. The method returns the arrival index of this
thread. This method can throw an InterruptedException if
the thread is interrupted while waiting for other threads or a
BrokenBarrierException if the barrier was broken for some reason
(for example, another thread was timed-out or interrupted). The
overloaded method takes a time-out period as an additional option;
this overloaded version throws a TimeoutException if all other
threads aren’t reached within the time-out period.
boolean isBroken() Returns
true if the barrier is broken. A barrier is broken if at least one
thread in that barrier was interrupted or timed-out, or if a
barrier
action failed throwing an exception.
void reset() Resets the barrier to the
initial state. If there are any threads waiting
on that barrier, they will throw the BrokenBarrier exception.
import java.util.concurrent.*;
// The run() method in this thread should be called only when four
players are ready to start the game
class MixedDoubleTennisGame extends Thread {
public void run() {
System.out.println("All four players ready, game starts \n
Love all...");
}
}
// This thread simulates arrival of a player.
// Once a player arrives, he/she should wait for other players to
arrive
class Player extends Thread {
CyclicBarrier waitPoint;
public Player(CyclicBarrier barrier, String name) {
this.setName(name);
waitPoint = barrier;
this.start();
}
public void run() {
System.out.println("Player " + getName() + " is
ready ");
try {
waitPoint.await(); // await for all four players to arrive
} catch(BrokenBarrierException | InterruptedException exception) {
System.out.println("An exception occurred while waiting...
" + exception);
}
}
}
// Creates a CyclicBarrier object by passing the number of threads
and the thread to run
// when all the threads reach the barrier
class CyclicBarrierTest {
public static void main(String []args) {
// a mixed-double tennis game requires four players; so wait for
four players
// (i.e., four threads) to join to start the game
System.out.println("Reserving tennis court \n As soon as four
players arrive,
game will start");
CyclicBarrier barrier = new CyclicBarrier(4, new
MixedDoubleTennisGame());
new Player(barrier, "G I Joe");
new Player(barrier, "Dora");
new Player(barrier, "Tintin");
new Player(barrier, "Barbie");
}
}
The program prints the following:
Reserving tennis court
As soon as four players arrive, game will start
Player G I Joe is ready
Player Dora is ready
Player Tintin is ready
Player Barbie is ready
All four players ready, game starts
Love all...
Now let’s see how this program works. In the main() method you
create a CyclicBarrier object. The constructor takes two arguments: the number
of threads to wait for, and the thread to invoke when all the threads reach the
barrier. In this case, you have four players to wait for, so you create four
threads, with each thread representing a player. The second argument for the
CyclicBarrier constructor is the MixedDoubleTennisGame object since this thread
represents the game, which will start once all four players are ready.
Inside the run() method for each Player thread, you call the
await() method on the CyclicBarrier
object. Once the number of awaiting threads for the CyclicBarrier
object reaches four, the run() method in MixedDoubleTennisGame is called.
Phaser
Phaser is a useful feature when few independent threads have to
work in phases to complete a task. So, a
synchronization point is needed for the threads to work on a part
of a task, wait for others to complete other part of
the task, and do a sync-up before advancing to complete the next
part of the task.
List of methods :
Phaser() Creates a Phaser object with no registered
parties and no parents. The
initial phase is set to 0.
Phaser(int numThreads) Creates a Phaser object with a given number of threads
(parties) to arrive
to advance to the next stage; the initial phase is set to 0.
int register() Adds a new thread (party) to
this Phaser object. Returns the phase
current number. Throws an IllegalStateException if the maximum
supported parties are already registered.
int bulkRegister(int numThreads) Adds numThreads of unarrived parties to this Phaser object. Returns
the phase current number. Throws an IllegalStateException if maximum
supported parties are already registered.
int arrive() Arrives at this phase
without waiting for other threads to arrive. Returns
the arrival phase number. Can throw an IllegalStateException.
int arriveAndDeregister() Same as the previous method, but also deregisters from the
Phaser object.
int arriveAndAwaitAdvance() Arrive at this phase and waits (i.e., blocks) until other threads
arrive.
int awaitAdvance(int phase) Waits (i.e., blocks) until this Phaser object advances to the
given
phase value.
int getRegisteredParties() Returns the number of threads (parties) registered with this
Phaser object.
int getArrivedParties() Returns the number of threads (parties) arrived at the
current phase of
the Phaser object.
int getUnarrivedParties() Returns the number of threads (parties) that have not
arrived when
compared to the registered parties at the current phase of the
Phaser object.
Consider the example of processing a delivery order in a small
coffee shop. Assume that there are only three workers: a cook, a helper, and an
attendant. To simplify the program logic, assume that each delivery order
consists of three food items. Completing a delivery order consists of preparing
the three orders one after another. To complete preparing a food item, all
three workers—the cook, the helper, and the attendant—should do their part of
the work. shows how this situation can be implemented using the Phaser class.
import java.util.concurrent.*;
// ProcessOrder thread is the master thread overlooking to make
sure that the Cook, Helper,
// and Attendant are doing their part of the work to complete
preparing the food items
// and complete order delivery
// To simplify the logic, we assume that each delivery order
consists of exactly three food items
class ProcessOrder {
public static void main(String []args) throws InterruptedException
{
// the Phaser is the synchronizer to make food items one-by-one,
// and deliver it before moving to the next item
Phaser deliveryOrder = new Phaser(1);
System.out.println("Starting to process the delivery order
");
new Worker(deliveryOrder, "Cook");
new Worker(deliveryOrder, "Helper");
new Worker(deliveryOrder, "Attendant");
for(int i = 1; i <= 3; i++) {
// Prepare, mix and deliver this food item
deliveryOrder.arriveAndAwaitAdvance();
System.out.println("Deliver food item no. " + i);
}
// work completed for this delivery order, so deregister
deliveryOrder.arriveAndDeregister();
System.out.println("Delivery order completed... give it to
the customer");
}
}
// The work could be a Cook, Helper, or Attendant. Though the
three work independently, the
// should all synchronize their work together to do their part and
complete preparing a food item
class Worker extends Thread {
Phaser deliveryOrder;
Worker(Phaser order, String name) {
deliveryOrder = order;
this.setName(name);
deliveryOrder.register();
start();
}
public void run() {
for(int i = 1; i <= 3; i++) {
System.out.println("\t" + getName() + " doing his
work for order no. " + i);
if(i == 3) {
// work completed for this delivery order, so deregister
deliveryOrder.arriveAndDeregister();
} else {
deliveryOrder.arriveAndAwaitAdvance();
}
try {
Thread.sleep(3000); // simulate time for preparing the food item
} catch(InterruptedException ie) {
/* ignore exception */
ie.printStackTrace();
}
}
}
}
The program prints the following:
Starting to process the delivery order
Cook doing his work for order no. 1
Attendant doing his work for order no. 1
Helper doing his work for order no. 1
Deliver food item no. 1
Helper doing his work for order no. 2
Attendant doing his work for order no. 2
Cook doing his work for order no. 2
Deliver food item no. 2
Helper doing his work for order no. 3
Cook doing his work for order no. 3
Attendant doing his work for order no. 3
Deliver food item no. 3
Delivery order completed . . . give it to the customer
In this program, you create a Phaser object to support
the synchronizing of three Worker thread objects. You create a Phaser object by
calling the default constructor of the Phaser object. When the Worker thread
objects are created, they register themselves to the Phaser object.
Alternatively, you could have called Phaser deliveryOrder = new Phaser(3); //
for three parties (i.e., threads)
In this case, you would not need to call the register() method on
the Phaser object in the Worker thread constructor.
In this case, you’ve assumed that a delivery order consists of
processing three food items, so the for loop runs three times. For each
iteration, you call deliveryOrder.arriveAndAwaitAdvance(). For this statement
to proceed, all the three parties (the Cook, Helper, and Attendant) have to
complete their part of the work to prepare the food item. You simulate
“preparing food” by calling the sleep() method in the run method for these
Worker threads. These worker threads call deliveryOrder.arriveAndAwaitAdvance()
for preparing each food item. As each food item is
prepared (i.e., each phase is completed), the work progresses to
the next phase. Once three phases are complete, the delivery order processing
is complete and the program returns.
Concurrent Collections
The java.util.concurrent package provides a number of classes that
are thread-safe equivalents of the ones
provided in the collections framework classes in the java.util
package