Java Multithreading Programming

November 1, 2019 · 1626 words · 8 min · JAVA

Yesterday evening, while revisiting the book “Advanced Java: Multithreading and Parallel Programming” by Liang Yung, I thought it would be a good idea to take the opportunity to document my understanding.

Java Multithreading Programming

Java provides built-in support for multithreading.

  • A thread is a single sequential flow of control within a process, and multiple threads can run concurrently within a process, each performing different tasks.
  • Multithreading is a specialized form of multitasking that consumes fewer resources.
  • A process contains the memory space allocated by the operating system and includes one or more threads. Threads cannot exist independently but must be part of a process. A process continues running until all non-daemon threads complete execution.
  • Multithreading allows developers to write efficient programs that fully utilize CPU resources.

Thread States

A thread is a dynamic execution entity that has different states throughout its lifecycle.

Thread States

  1. New:

    • A thread is in a new state when it is created using the new keyword with the Thread class or its subclass. It remains in this state until the program starts the thread using the start() method.
  2. Runnable:

    • After invoking the start() method, the thread enters the runnable state and waits in the ready queue to be allocated CPU resources by the JVM thread scheduler.
  3. Running:

    • Once the thread gets CPU resources, it enters the running state and executes the run() method. In the running state, a thread can transition to blocked, runnable, or terminated states.
  4. Blocked:

    • When a thread executes methods like sleep() or suspend() and loses resources, it transitions to the blocked state. After resuming resources or the sleep time is over, it can reenter the runnable state.
  5. Waiting Blocked:

    • A running thread calling the wait() method enters the waiting blocked state.
  6. Synchronized Blocked:

    • A thread trying to acquire a synchronized lock but failing due to another thread owning the lock transitions to the synchronized blocked state.
  7. Other Blocked:

    • Through methods like sleep(), join(), or I/O requests, a thread can enter the other blocked state. Once these operations are complete, it can reenter the runnable state.
  8. Terminated:

    • A thread enters the terminated state once it has completed its execution or met some terminating conditions.

Creating Task Classes and Threads

  • A task in Java is an object that implements the Runnable interface (containing the run() method). You need to override the run() method to define the task’s behavior.
  • Threads are created through the Thread class, which also contains methods for controlling the thread.

Creating a thread is always based on a task:

Thread thread = new Thread(new TaskClass());
// Calling thread.start() will invoke TaskClass's run() method immediately.

Other Methods in the Thread Class

  • yield(): Temporarily releases the CPU to let other threads execute.
  • sleep(): Makes the thread sleep for a specified period to allow other threads to run.

    Note: sleep() may throw an InterruptedException, which is a checked exception, meaning Java requires you to catch it in a try block.

Thread Priorities

Threads have priorities. The Java Virtual Machine always gives preference to higher-priority threads. If all threads have the same priority, they follow round-robin scheduling.

Use Thread.setPriority() to set a thread’s priority.

Thread Pool

If you need to create a thread for each of many tasks, starting new threads for each task can limit throughput and degrade performance. Using a thread pool is an ideal solution for managing the concurrent execution of tasks.

Java provides the Executor interface to execute tasks in a thread pool, and the ExecutorService interface is used to manage and control those tasks. Executors are created through static methods like newFixedThreadPool(int) (to create a pool with a fixed number of threads) or newCachedThreadPool() (to create a pool with a dynamically managed number of threads).

Example:

import java.util.concurrent.*;

public class ExecutorDemo {
  public static void main(String[] args) {
    // Create a fixed thread pool with a maximum of three threads
    ExecutorService executor = Executors.newFixedThreadPool(3);
    // Submit runnable tasks to the executor
    executor.execute(new PrintChar('a', 100));
    executor.execute(new PrintChar('b', 100));
    executor.execute(new PrintNum(100));
    // Shut down the executor
    executor.shutdown();
  }
}

Thread pools provide a better way to manage threads. They primarily address issues related to the overhead of thread lifecycle and resource limitations:

  • Thread pools reduce the time and system resources spent on creating and destroying threads. By reusing threads across multiple tasks, the cost of creating threads is amortized. Since threads already exist when new requests come in, they eliminate the latency caused by thread creation, allowing the application to respond faster.
  • Thread pools allow easy thread management, e.g., using a ScheduledThreadPool to execute tasks after a delay or on a repeating schedule.
  • They control concurrency levels, preventing resource contention when many threads compete for CPU resources.

Thread Synchronization

If multiple threads simultaneously access the same resource, it may lead to data corruption. If two tasks interact with a shared resource in a conflicting manner, they are said to be in a race condition. Without race conditions, a program is considered thread-safe.

To prevent race conditions, threads must be synchronized to prevent multiple threads from accessing a particular section of the program simultaneously.

Methods for Synchronizing Threads

Before executing a synchronized method, a lock must be obtained. Locks provide exclusive access to a shared resource. For instance methods, the object is locked; for static methods, the class is locked.

  • synchronized keyword:

You can apply this keyword to methods or blocks of code.

synchronized (expr) {
  // do something
}

public synchronized void func() {}
  • Lock-based synchronization: Locks and conditions can be used explicitly for thread synchronization.

    A lock is an instance of the Lock interface, which provides methods to acquire and release locks. ReentrantLock is an implementation of the lock mechanism for mutual exclusion.

Example:

public void deposit(int amount) {
  lock.lock(); // Acquire the lock
  try {
    int newBalance = balance + amount;
    // This delay is deliberately added to magnify the
    // data corruption problem and make it easy to see.
    Thread.sleep(5);
    balance = newBalance;
  } catch (InterruptedException ex) {
    // Handle the exception
  } finally {
    lock.unlock(); // Release the lock
  }
}

Avoiding Deadlocks

A deadlock may occur when multiple threads need to acquire locks on several shared objects simultaneously. Deadlocks can be avoided by ordering resource acquisition.

Thread Collaboration

Threads can communicate by using conditions to specify what actions they should take under certain circumstances.

A condition is an object created through the Lock object’s newCondition() method. Threads can use await(), signal(), or signalAll() to communicate.

  • await(): Causes the current thread to wait until the condition is signaled.
  • signal()/signalAll(): Wakes one or all threads waiting on the condition.

Conditions must be used with locks; invoking their methods without a lock will result in an IllegalMonitorStateException.

Blocking Queues

Java provides blocking queues for multithreading, which allow synchronization without needing locks or conditions explicitly. They provide two additional operations:

  • When the queue is empty, a retrieval operation will block the thread until elements become available.
  • When the queue is full, an insert operation will block the thread until space becomes available.

Blocking queues are commonly used in producer-consumer scenarios. Producer threads place results in the queue, while consumer threads retrieve and process those results. Blocking queues automatically balance the workload between producers and consumers.

Core Methods of BlockingQueue

  1. Adding Data:

    • put(E e): Inserts an element at the end of the queue, waiting if the queue is full.
    • offer(E e, long timeout, TimeUnit unit): Attempts to add an element, waiting up to the specified time if the queue is full. If successful, returns true; otherwise, returns false.
  2. Retrieving Data:

    • take(): Retrieves and removes the head of the queue, waiting if necessary until an element becomes available.
    • drainTo(): Retrieves and removes all available elements from the queue, improving efficiency by reducing the number of lock/unlock operations.
    • poll(long timeout, TimeUnit unit): Retrieves and removes the head of the queue, waiting up to the specified time if the queue is empty. If no element is found within the time limit, returns null.

Parallel Programming

Java uses the Fork/Join framework to implement parallel programming. In this framework, a Fork can be considered a separate task executed by a thread.

Decompose a problem into multiple non-overlapping subproblems that can be solved independently, then combine their solutions to get the overall answer.

Tasks are defined using the ForkJoinTask class and executed in a ForkJoinPool instance.

ForkJoinTask is the base class for tasks. It’s a lightweight entity, meaning many tasks can be executed by a small number of threads in the ForkJoinPool.

Example:

import java.util.concurrent.RecursiveAction;
import java.util.concurrent.ForkJoinPool;

public class ParallelMergeSort {
  public static void main(String[] args) {
    final int SIZE = 7000000;
    int[] list1 = new int[SIZE];
    int[] list2 = new int[SIZE];
    for (int i = 0; i < list1.length; i++)
      list1[i] = list2[i] = (int)(Math.random() * 10000000);
    long startTime = System.currentTimeMillis();
    parallelMergeSort(list1); // Invoke parallel merge sort
    long endTime = System.currentTimeMillis();
    System.out.println("\nParallel time with " +
      Runtime.getRuntime().availableProcessors() +
      " processors is " + (endTime - startTime) + " milliseconds");
    startTime = System.currentTimeMillis();
    MergeSort.mergeSort(list2); // MergeSort is in Listing 23.5
    endTime = System.currentTimeMillis();
    System.out.println("\nSequential time is " +
      (endTime - startTime) + " milliseconds");
  }

  public static void parallelMergeSort(int[] list) {
    RecursiveAction mainTask = new SortTask(list);
    ForkJoinPool pool = new ForkJoinPool();
    pool.invoke(mainTask);
  }

  private static class SortTask extends RecursiveAction {
    private final int THRESHOLD = 500;
    private int[] list;

    SortTask(int[] list) {
      this.list = list;
    }

    @Override
    protected void compute() {
      if (list.length < THRESHOLD)
        java.util.Arrays.sort(list);
      else {
        // Obtain the first half
        int[] firstHalf = new int[list.length / 2];
        System.arraycopy(list, 0, firstHalf, 0, list.length / 2);

        // Obtain the second half
        int secondHalfLength = list.length - list.length / 2;
        int[] secondHalf = new int[secondHalfLength];
        System.arraycopy(list, list.length / 2, secondHalf, 0, secondHalfLength);

        // Recursively sort the two halves
        invokeAll(new SortTask(firstHalf), new SortTask(secondHalf));

        // Merge firstHalf with secondHalf into list
        MergeSort.merge(firstHalf, secondHalf, list);
      }
    }
  }
}