Java Multithreading Programming
November 1, 2019 · 1626 words · 8 min · JAVA
Yesterday evening, while revisiting the book “Advanced Java: Multithreading and Parallel Programming” by Liang Yung, I thought it would be a good idea to take the opportunity to document my understanding.
Java Multithreading Programming
Java provides built-in support for multithreading.
- A thread is a single sequential flow of control within a process, and multiple threads can run concurrently within a process, each performing different tasks.
- Multithreading is a specialized form of multitasking that consumes fewer resources.
- A process contains the memory space allocated by the operating system and includes one or more threads. Threads cannot exist independently but must be part of a process. A process continues running until all non-daemon threads complete execution.
- Multithreading allows developers to write efficient programs that fully utilize CPU resources.
Thread States
A thread is a dynamic execution entity that has different states throughout its lifecycle.
-
New:
- A thread is in a new state when it is created using the
new
keyword with theThread
class or its subclass. It remains in this state until the program starts the thread using thestart()
method.
- A thread is in a new state when it is created using the
-
Runnable:
- After invoking the
start()
method, the thread enters the runnable state and waits in the ready queue to be allocated CPU resources by the JVM thread scheduler.
- After invoking the
-
Running:
- Once the thread gets CPU resources, it enters the running state and executes the
run()
method. In the running state, a thread can transition to blocked, runnable, or terminated states.
- Once the thread gets CPU resources, it enters the running state and executes the
-
Blocked:
- When a thread executes methods like
sleep()
orsuspend()
and loses resources, it transitions to the blocked state. After resuming resources or the sleep time is over, it can reenter the runnable state.
- When a thread executes methods like
-
Waiting Blocked:
- A running thread calling the
wait()
method enters the waiting blocked state.
- A running thread calling the
-
Synchronized Blocked:
- A thread trying to acquire a synchronized lock but failing due to another thread owning the lock transitions to the synchronized blocked state.
-
Other Blocked:
- Through methods like
sleep()
,join()
, or I/O requests, a thread can enter the other blocked state. Once these operations are complete, it can reenter the runnable state.
- Through methods like
-
Terminated:
- A thread enters the terminated state once it has completed its execution or met some terminating conditions.
Creating Task Classes and Threads
- A task in Java is an object that implements the
Runnable
interface (containing therun()
method). You need to override therun()
method to define the task’s behavior. - Threads are created through the
Thread
class, which also contains methods for controlling the thread.
Creating a thread is always based on a task:
Thread thread = new Thread(new TaskClass());
// Calling thread.start() will invoke TaskClass's run() method immediately.
Other Methods in the Thread Class
yield()
: Temporarily releases the CPU to let other threads execute.sleep()
: Makes the thread sleep for a specified period to allow other threads to run.Note:
sleep()
may throw anInterruptedException
, which is a checked exception, meaning Java requires you to catch it in atry
block.
Thread Priorities
Threads have priorities. The Java Virtual Machine always gives preference to higher-priority threads. If all threads have the same priority, they follow round-robin scheduling.
Use
Thread.setPriority()
to set a thread’s priority.
Thread Pool
If you need to create a thread for each of many tasks, starting new threads for each task can limit throughput and degrade performance. Using a thread pool is an ideal solution for managing the concurrent execution of tasks.
Java provides the Executor
interface to execute tasks in a thread pool, and the ExecutorService
interface is used to manage and control those tasks. Executors are created through static methods like newFixedThreadPool(int)
(to create a pool with a fixed number of threads) or newCachedThreadPool()
(to create a pool with a dynamically managed number of threads).
Example:
import java.util.concurrent.*;
public class ExecutorDemo {
public static void main(String[] args) {
// Create a fixed thread pool with a maximum of three threads
ExecutorService executor = Executors.newFixedThreadPool(3);
// Submit runnable tasks to the executor
executor.execute(new PrintChar('a', 100));
executor.execute(new PrintChar('b', 100));
executor.execute(new PrintNum(100));
// Shut down the executor
executor.shutdown();
}
}
Thread pools provide a better way to manage threads. They primarily address issues related to the overhead of thread lifecycle and resource limitations:
- Thread pools reduce the time and system resources spent on creating and destroying threads. By reusing threads across multiple tasks, the cost of creating threads is amortized. Since threads already exist when new requests come in, they eliminate the latency caused by thread creation, allowing the application to respond faster.
- Thread pools allow easy thread management, e.g., using a
ScheduledThreadPool
to execute tasks after a delay or on a repeating schedule. - They control concurrency levels, preventing resource contention when many threads compete for CPU resources.
Thread Synchronization
If multiple threads simultaneously access the same resource, it may lead to data corruption. If two tasks interact with a shared resource in a conflicting manner, they are said to be in a race condition. Without race conditions, a program is considered thread-safe.
To prevent race conditions, threads must be synchronized to prevent multiple threads from accessing a particular section of the program simultaneously.
Methods for Synchronizing Threads
Before executing a synchronized method, a lock must be obtained. Locks provide exclusive access to a shared resource. For instance methods, the object is locked; for static methods, the class is locked.
synchronized
keyword:
You can apply this keyword to methods or blocks of code.
synchronized (expr) {
// do something
}
public synchronized void func() {}
- Lock-based synchronization:
Locks and conditions can be used explicitly for thread synchronization.
A lock is an instance of the
Lock
interface, which provides methods to acquire and release locks.ReentrantLock
is an implementation of the lock mechanism for mutual exclusion.
Example:
public void deposit(int amount) {
lock.lock(); // Acquire the lock
try {
int newBalance = balance + amount;
// This delay is deliberately added to magnify the
// data corruption problem and make it easy to see.
Thread.sleep(5);
balance = newBalance;
} catch (InterruptedException ex) {
// Handle the exception
} finally {
lock.unlock(); // Release the lock
}
}
Avoiding Deadlocks
A deadlock may occur when multiple threads need to acquire locks on several shared objects simultaneously. Deadlocks can be avoided by ordering resource acquisition.
Thread Collaboration
Threads can communicate by using conditions to specify what actions they should take under certain circumstances.
A condition is an object created through the
Lock
object’snewCondition()
method. Threads can useawait()
,signal()
, orsignalAll()
to communicate.
await()
: Causes the current thread to wait until the condition is signaled.signal()
/signalAll()
: Wakes one or all threads waiting on the condition.
Conditions must be used with locks; invoking their methods without a lock will result in an
IllegalMonitorStateException
.
Blocking Queues
Java provides blocking queues for multithreading, which allow synchronization without needing locks or conditions explicitly. They provide two additional operations:
- When the queue is empty, a retrieval operation will block the thread until elements become available.
- When the queue is full, an insert operation will block the thread until space becomes available.
Blocking queues are commonly used in producer-consumer scenarios. Producer threads place results in the queue, while consumer threads retrieve and process those results. Blocking queues automatically balance the workload between producers and consumers.
Core Methods of BlockingQueue
-
Adding Data:
put(E e)
: Inserts an element at the end of the queue, waiting if the queue is full.offer(E e, long timeout, TimeUnit unit)
: Attempts to add an element, waiting up to the specified time if the queue is full. If successful, returnstrue
; otherwise, returnsfalse
.
-
Retrieving Data:
take()
: Retrieves and removes the head of the queue, waiting if necessary until an element becomes available.drainTo()
: Retrieves and removes all available elements from the queue, improving efficiency by reducing the number of lock/unlock operations.poll(long timeout, TimeUnit unit)
: Retrieves and removes the head of the queue, waiting up to the specified time if the queue is empty. If no element is found within the time limit, returnsnull
.
Parallel Programming
Java uses the Fork/Join framework to implement parallel programming. In this framework, a Fork can be considered a separate task executed by a thread.
Decompose a problem into multiple non-overlapping subproblems that can be solved independently, then combine their solutions to get the overall answer.
Tasks are defined using the ForkJoinTask
class and executed in a ForkJoinPool
instance.
ForkJoinTask
is the base class for tasks. It’s a lightweight entity, meaning many tasks can be executed by a small number of threads in theForkJoinPool
.
Example:
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.ForkJoinPool;
public class ParallelMergeSort {
public static void main(String[] args) {
final int SIZE = 7000000;
int[] list1 = new int[SIZE];
int[] list2 = new int[SIZE];
for (int i = 0; i < list1.length; i++)
list1[i] = list2[i] = (int)(Math.random() * 10000000);
long startTime = System.currentTimeMillis();
parallelMergeSort(list1); // Invoke parallel merge sort
long endTime = System.currentTimeMillis();
System.out.println("\nParallel time with " +
Runtime.getRuntime().availableProcessors() +
" processors is " + (endTime - startTime) + " milliseconds");
startTime = System.currentTimeMillis();
MergeSort.mergeSort(list2); // MergeSort is in Listing 23.5
endTime = System.currentTimeMillis();
System.out.println("\nSequential time is " +
(endTime - startTime) + " milliseconds");
}
public static void parallelMergeSort(int[] list) {
RecursiveAction mainTask = new SortTask(list);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(mainTask);
}
private static class SortTask extends RecursiveAction {
private final int THRESHOLD = 500;
private int[] list;
SortTask(int[] list) {
this.list = list;
}
@Override
protected void compute() {
if (list.length < THRESHOLD)
java.util.Arrays.sort(list);
else {
// Obtain the first half
int[] firstHalf = new int[list.length / 2];
System.arraycopy(list, 0, firstHalf, 0, list.length / 2);
// Obtain the second half
int secondHalfLength = list.length - list.length / 2;
int[] secondHalf = new int[secondHalfLength];
System.arraycopy(list, list.length / 2, secondHalf, 0, secondHalfLength);
// Recursively sort the two halves
invokeAll(new SortTask(firstHalf), new SortTask(secondHalf));
// Merge firstHalf with secondHalf into list
MergeSort.merge(firstHalf, secondHalf, list);
}
}
}
}