Concurrent programming

Concurrent programming

Tags
Java
Concurrent Programming
Dead Lock
Published
Author

Concurrently Programming Challenges

Concurrent programming presents several challenges, mainly stemming from the fact that multiple threads or processes are executing simultaneously, potentially sharing resources and data. Some of the main challenges include:
  1. Race conditions: When two or more threads access shared data simultaneously, the final value of the data might depend on the order in which the threads are executed. This can lead to unpredictable results and hard-to-reproduce bugs.
  1. Deadlocks: A situation where two or more threads are stuck waiting for each other to release a resource (e.g., a lock), causing the program to hang indefinitely. Deadlocks usually occur when multiple threads acquire locks in an inconsistent order.
  1. Livelocks: A livelock is similar to a deadlock, but in this case, threads are not completely blocked; they are just unable to make progress due to the constant change of states. A livelock can occur when two or more threads are stuck in a loop, continuously responding to each other's actions but not making any progress.
  1. Starvation: A situation where a thread is unable to make progress because it is constantly waiting for a shared resource that is always in use by other threads. Starvation can lead to reduced performance or even application crashes.
  1. Resource contention: When multiple threads compete for the same limited resources (e.g., memory, CPU time, or I/O bandwidth), it can lead to contention and significantly impact the performance of the application.
  1. Correctness and consistency: Ensuring that concurrent operations maintain the correctness and consistency of shared data structures can be challenging, as different threads may have different views of the shared state at any given time.
  1. Complexity: Concurrent programming often introduces additional complexity to the code, making it harder to read, understand, and maintain. Debugging and testing concurrent code can also be more challenging due to the non-deterministic nature of thread execution.

Context switching

In a multitasking operating system, the scheduler manages the execution of multiple processes or threads by allocating time slices (also called "quantum") to each of them. When the allocated time slice for a process or thread expires or when a higher-priority task becomes available, the operating system performs a context switch to save the current state of the running process and restore the state of the next process to be executed. The saved state includes the values of CPU registers, program counter, stack pointer, and other relevant information.
 
Context switching has some overhead, as the operating system needs to save and restore the state of processes or threads. This overhead can impact the performance of the system, especially if the rate of context switching is high. However, context switching is a crucial feature for multitasking systems, as it allows efficient sharing of resources and provides the illusion of concurrent execution of multiple processes or threads.

CAS

The Java Atomic package uses the Compare and Swap (CAS) algorithm to sychornize atomic operations particularly in lock-free data structures and algorithms.
  • CAS takes three arguments: memory location, expected value and new value. The algorithm compare the value at the memory location with the expected value; if they match, the new value is written to the memory location. The entire operation is atomic, meaning it cannot be interrupted by other threads, ensuring consistency and correctness in concurrent environments.
  • Here's a high-level description of the CAS algorithm:
      1. Read the current value at the memory location.
      1. Compare the current value with the expected value.
      1. If the values match, update the memory location with the new value.
      1. Return a boolean indicating whether the operation was successful.
CAS requires no locking mechanism and requires the least threads for the given tasks.

Deadlock Demo

public class DeadLockDemo { private static final Object HAIR_A = new Object(); private static final Object HAIR_B = new Object(); public static void main(String[] args) { new Thread(() -> { synchronized (HAIR_A) { try { Thread.sleep(50L); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (HAIR_B) { System.out.println("A successfully grabbed B's hair"); } } }).start(); new Thread(() -> { synchronized (HAIR_B) { synchronized (HAIR_A) { System.out.println("B successfully grabbed A's hair"); } } }).start(); } }
  1. Thread 1 acquires the lock on HAIR_A by entering the synchronized (HAIR_A) block.
  1. Thread 1 sleeps for 50 milliseconds to simulate some processing time.
  1. Meanwhile, Thread 2 acquires the lock on HAIR_B by entering the synchronized (HAIR_B) block.
  1. Thread 1 wakes up from sleep and tries to acquire the lock on HAIR_B by entering the synchronized (HAIR_B) block. However, it can't proceed because Thread 2 holds the lock on HAIR_B.
  1. At the same time, Thread 2 tries to acquire the lock on HAIR_A by entering the synchronized (HAIR_A) block. It can't proceed because Thread 1 holds the lock on HAIR_A.
  1. As a result, both threads are stuck waiting for each other to release the respective locks, and neither can proceed. This situation leads to a deadlock.

Thread Safety

import java.util.concurrent.CountDownLatch; public class UnSafeThread { private static int num = 0; private static CountDownLatch countDownLatch = new CountDownLatch(10); /** * Increment num by 1 */ public static void increment() { num++; } public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(() -> { for (int j = 0; j < 100; j++) { increment(); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } countDownLatch.countDown(); }).start(); } while (true) { if (countDownLatch.getCount() == 0) { System.out.println(num); break; } } } }
  1. The main method creates 10 threads in a loop.
  1. Each thread acquires a "virtual" lock on the shared num variable by entering the increment() method (although no actual synchronization mechanism is used).
  1. Each thread increments the num variable by 1.
  1. Each thread sleeps for 10 milliseconds to simulate some processing time.
  1. Meanwhile, other threads are being created and run concurrently, also incrementing the num variable and sleeping for 10 milliseconds.
  1. As the threads access the shared num variable without any synchronization, race conditions occur, causing the final value of num to be unpredictable.
  1. After each thread completes its execution, it calls the countDown() method on the CountDownLatch.
  1. The main thread is continuously checking the count of the CountDownLatch in a while loop.
  1. When the count of the CountDownLatch reaches 0, indicating that all threads have completed their execution, the main thread breaks out of the while loop.
  1. The main thread prints the final value of the num variable, which is the result of all threads' concurrent operations.

Basic Thread knowledge

Process vs Thread

Processes and threads are both ways to achieve concurrent execution in operating systems, but they have some important differences:

Process:

A process is an instance of a program that is being executed. Each process has its own separate memory space and does not share it with other processes. Because of this, processes are often called "heavyweight" or costly, as switching between different processes requires some time (a context switch) because the system needs to store and restore memory maps, registers, etc.
Processes can communicate with each other, but this communication requires some setup and coordination, as the operating system needs to provide some communication mechanism such as pipes or sockets.
Examples of process-based multitasking include running a web browser, a text editor, and a database system on a computer at the same time. Each of these tasks runs in its own process.

Thread:

A thread, also known as a lightweight process, is a unit of execution within a process. All threads within the same process share the same memory space, which means they can access the same variables and data structures, making communication between threads very efficient.
However, this shared memory model also necessitates careful coordination when accessing shared data, as multiple threads can interfere with each other (a situation known as a race condition). To prevent this, various synchronization techniques, such as mutexes, semaphores, and condition variables, can be used.
Threads are useful when different tasks can be performed concurrently within the same program, like handling multiple connections in a server or updating the user interface while performing background computations in a desktop application.

Summary

In summary, the primary difference between a process and a thread is that processes are isolated from each other with their own memory space, while threads share memory space but require synchronization to manage access to shared data.
 

Thread states

A thread can be in one of the following states during its lifecycle:
  1. New: The thread is created but not yet started.
  1. Runnable (or Running): The thread might be running or ready to run at any instant of time. It's the responsibility of the thread scheduler to give the thread time to run.
  1. Blocked (or Waiting): The thread is alive, but currently not eligible to run due to awaiting some external processing such as file I/O to complete.
  1. Waiting: A thread is in a blocked state while it waits for another thread to perform a particular action.
  1. Timed Waiting: A thread is in the timed waiting state due to calling a method with a specified time limit.
  1. Terminated (or Dead): A thread is in a terminated state when its run() method exits.
stateDiagram [*] --> New: Thread created New --> Runnable: Thread started state fork_state <<fork>> Runnable --> fork_state fork_state --> Running: Scheduler selects thread fork_state --> Blocked: Awaits resource fork_state --> Waiting: Awaits another thread fork_state --> TimedWaiting: Thread sleeps Running --> Dead: run() method completes Dead --> [*] Blocked --> Runnable: Resource available Waiting --> Runnable: Other thread signals TimedWaiting --> Runnable: Sleep time expires or gets interrupted
 

Creating new thread

public class MyRunnable implements Runnable { @Override public void run() { // code to execute in the new thread for (int i = 0; i < 5; i++) { System.out.println(Thread.currentThread().getId() +" Value "+ i); try { // Sleep for a while Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } public class Main { public static void main(String[] args) { Thread thread1 = new Thread(new MyRunnable()); thread1.start(); // starts the thread Thread thread2 = new Thread(new MyRunnable()); thread2.start(); // starts the second thread } } // Thread pool method public class Main { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(2); // creating a pool of 2 threads Runnable worker = new MyRunnable(); executor.execute(worker); // calling execute method of ExecutorService executor.execute(worker); executor.shutdown(); // shut down executor while (!executor.isTerminated()) { } System.out.println("Finished all threads"); } }
Creating new thread using runnable instead of extends Thread as java only allows single inheritance. but you can implement multiple interface.
This is good for code cleanliness
  1. In the main method of the Main class, we create an ExecutorService with a fixed thread pool of 2 threads using Executors.newFixedThreadPool(2).
  1. Then we create an instance of MyRunnable, which we pass to the execute method of the ExecutorService. This method starts a new thread and calls the run method of MyRunnable in that thread.
  1. The shutdown method is called on the ExecutorService to stop accepting new tasks and to allow previously submitted tasks to complete.
  1. The while loop checks if all tasks have completed execution.
  1. Once all threads have completed execution, it prints "Finished all threads".

Thread suspension and recover

Thread suspension is a process where a thread's execution is temporarily stopped or paused. The suspended thread can be restarted at a later point in time, a process known as thread recovery or resumption.
However, as of Java SE 11, methods such as Thread.suspend(), Thread.resume(), and Thread.stop() have been deprecated due to their inherent deadlock risks. A suspended thread that holds shared resources might never release them if it's not resumed, causing deadlocks. Similarly, abruptly stopping threads with Thread.stop() can leave shared data in an inconsistent state.
Despite these methods being deprecated, the concepts of thread suspension and resumption are still applicable in concurrent programming. A common and safe way to suspend and resume a thread in modern Java applications involves using wait(), notify(), or notifyAll() methods provided by the Object class.
A good use case for thread suspension and recovery might be in a producer-consumer scenario, where you want the consumer to wait (suspend) when there's no data to consume and to continue (recover) when new data arrives.
public class SharedObject { private String data = null; public synchronized void produce(String data) { while (this.data != null) { try { wait(); // suspend the thread if data already exists } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } this.data = data; notifyAll(); // notify all waiting threads } public synchronized String consume() { while (this.data == null) { try { wait(); // suspend the thread if no data to consume } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } String data = this.data; this.data = null; notifyAll(); // notify all waiting threads return data; } } public class Producer implements Runnable { private final SharedObject sharedObject; public Producer(SharedObject sharedObject) { this.sharedObject = sharedObject; } @Override public void run() { for (int i = 0; i < 10; i++) { sharedObject.produce("Data " + i); } } } public class Consumer implements Runnable { private final SharedObject sharedObject; public Consumer(SharedObject sharedObject) { this.sharedObject = sharedObject; } @Override public void run() { for (int i = 0; i < 10; i++) { System.out.println("Consumed: " + sharedObject.consume()); } } } public class Main { public static void main(String[] args) { SharedObject sharedObject = new SharedObject(); new Thread(new Producer(sharedObject)).start(); new Thread(new Consumer(sharedObject)).start(); } }
The SharedObject class has a data field that's shared between a producer and a consumer. The producer produces data, and the consumer consumes it. If the producer tries to produce data when the previous data hasn't been consumed, it's suspended using the wait() method. Similarly, if the consumer tries to consume data when there's no data, it's suspended. When data is produced or consumed, all waiting threads are notified using notifyAll().
 

Thread Interruption

Thread interruption is a mechanism in Java that allows one thread to signal another thread to stop its current operation and move on to another operation. Essentially, an interrupt is a way to communicate between threads, typically to convey a message like "stop what you're doing and do something else" or simply "stop executing".
A thread sends an interrupt by invoking interrupt() on the Thread object for the thread to be interrupted. For the interrupted thread to respond to the interrupt, the thread must be coded to periodically check the interrupted status by invoking the static method Thread.interrupted(), which returns true if an interrupt has been received.
An example use case could be a UI application where a user clicks a "Cancel" button to stop a long-running operation. The event handler for the button could interrupt the thread performing the operation.
public class MyRunnable implements Runnable { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { // Long running operation here. System.out.println("Thread is running..."); try { Thread.sleep(1000); // Simulating a long-running operation. } catch (InterruptedException e) { // Thread was interrupted during sleep. Thread.currentThread().interrupt(); // Set the interrupt flag. } } System.out.println("Thread was interrupted and is stopping..."); } } public class Main { public static void main(String[] args) { Thread thread = new Thread(new MyRunnable()); thread.start(); // Start the thread. try { Thread.sleep(5000); // Wait for 5 seconds } catch (InterruptedException e) { Thread.currentThread().interrupt(); } thread.interrupt(); // Interrupt the thread. } }
In this example, MyRunnable is a Runnable that runs a long operation in a loop. It checks the interrupt status of the thread in each iteration of the loop, and breaks out of the loop if the thread was interrupted.
In the main method, we start the MyRunnable in a new thread, wait for 5 seconds, and then interrupt the thread. After being interrupted, the thread stops its long-running operation and ends.

Thread Priority

In Java, each thread is assigned a priority that affects the order in which it is scheduled to run. Thread priority is an integer value that ranges from Thread.MIN_PRIORITY (1) to Thread.MAX_PRIORITY (10). The default priority, Thread.NORM_PRIORITY, is 5.
When the thread scheduler decides which thread to run next, it generally selects from the runnable threads with the highest priority. If multiple threads have the same priority, the scheduler uses a round-robin strategy, giving each thread a chance to run in turn.
However, the exact behavior of thread priorities can depend on the underlying operating system, and it's not always guaranteed that higher-priority threads will always run before lower-priority ones. Therefore, it's usually best not to rely too much on thread priorities to control the behavior of a program.
A possible use case for thread priorities might be a multimedia application that needs to ensure smooth audio playback. The thread handling the audio output could be given a high priority to help prevent audio glitches.
public class HighPriorityThread extends Thread { public HighPriorityThread(String name) { super(name); setPriority(MAX_PRIORITY); // Set this thread to high priority } @Override public void run() { for (int i = 0; i < 50; i++) { System.out.println(getName() + " is running, count: " + i); } } } public class LowPriorityThread extends Thread { public LowPriorityThread(String name) { super(name); setPriority(MIN_PRIORITY); // Set this thread to low priority } @Override public void run() { for (int i = 0; i < 50; i++) { System.out.println(getName() + " is running, count: " + i); } } } public class Main { public static void main(String[] args) { HighPriorityThread highPriorityThread = new HighPriorityThread("HighPriorityThread"); LowPriorityThread lowPriorityThread = new LowPriorityThread("LowPriorityThread"); lowPriorityThread.start(); highPriorityThread.start(); } }
In this example, HighPriorityThread and LowPriorityThread are both subclasses of Thread. They each override the run method to print a message 50 times. The HighPriorityThread is given a high priority by calling setPriority(MAX_PRIORITY), and the LowPriorityThread is given a low priority by calling setPriority(MIN_PRIORITY). In the main method, the threads are started with start(). The JVM's thread scheduler generally will give the HighPriorityThread more chances to run than the LowPriorityThread.
 

Thread Safety

Thread safety in programming means that a piece of code, method, class, or data structure is designed to function correctly when accessed by multiple threads concurrently. A piece of code is considered thread-safe if it only manipulates shared data structures in a manner that guarantees safe execution by multiple threads at the same time.
Take a simple operation num++, it may seem like a simple, atomic operation, but it's actually not. This operation involves three separate steps:
  1. Read the current value of num.
  1. Increment the value by one.
  1. Write the new value back to num.
When these operations are conducted in a multi-threaded environment, there's a chance for a race condition — a situation where the behavior of the program depends on the relative timing of events, leading to erroneous results. Here's an example of what could happen with two threads:
  1. Thread A reads the value of num (let's say it's 0).
  1. Thread B reads the value of num (still 0).
  1. Thread A increments the value and writes it back to num (now it's 1).
  1. Thread B increments the value it read earlier (0) and writes it back to num.
Now num is 1, even though num++ was executed twice. This is a simple example of a race condition that can occur if num++ is not done in a thread-safe way.
To make num++ thread-safe, we can use synchronization mechanisms provided by the Java language. For example, we can use the synchronized keyword to ensure that these three operations are atomic, meaning they're executed as a single operation without being interrupted by other threads:
public class Counter { private int num = 0; public synchronized void increment() { num++; } }

Synchronized In-depth

The synchronized keyword in Java is used to control access to critical sections of code that could cause data inconsistency if accessed by multiple threads concurrently.
The synchronized keyword can be used in the following ways:
  1. Synchronized Method: When a method is declared with the synchronized keyword, it's known as a synchronized method. Only one thread can access a synchronized method at a time. If a thread is in a synchronized method, all other threads that want to call any of the object's synchronized methods have to wait.
    1. public synchronized void increment() { num++; }
  1. Synchronized Block: Synchronized blocks in Java are marked with the synchronized keyword. A synchronized block can be used to mark a specific section of the method as synchronized, rather than the whole method. This can help to improve performance because only the necessary part of the code is synchronized.
    1. public void increment() { synchronized(this) { num++; } }
      In this example, the synchronized keyword is followed by an object in parentheses. This object is called a monitor object. The code inside the synchronized block is only accessible to a single thread that has obtained the lock on the monitor object.
Differences
  • Granularity: Synchronized methods synchronize the entire method and prevent multiple threads from executing the method concurrently. On the other hand, synchronized blocks provide more granular control over the code. Only the code present inside the block is synchronized.
  • Performance: Since synchronized blocks have a smaller scope compared to synchronized methods, they can lead to better performance in multithreaded applications.
  • Flexibility: Synchronized blocks are more flexible than synchronized methods because they allow you to choose which object's lock will be used for synchronization. This way, you can control the level of concurrency in your program more precisely. When you synchronize on this, you are using the object's intrinsic lock. But you can also use other objects for synchronization. This can help to avoid unnecessary contention if different synchronized blocks are independent and don't need to be locked by the same object.
Remember, inappropriate use of synchronized can cause issues such as deadlocks, so it's important to understand it well and apply it appropriately.
 

Thread and Singleton

 
The Singleton pattern is a design pattern that restricts the instantiation of a class to a single instance and provides a global point of access to it. In other words, a singleton class is one which only allows a single instance of itself to be created, and usually gives simple access to that instance.

Hunger Singleton

Instantiate on class load, even if it is not used, the resource will still be occupied and wasted.
Thread safe

Lazy Singleton

A lazy initialization in Singleton means that the Singleton instance is not created until it's needed. This can save resources, particularly if the Singleton object is resource-intensive to create.
However, in a multi-threaded environment, a lazy-initialized Singleton can be tricky because multiple threads can potentially try to initialize the instance at the same time. If the Singleton isn't properly thread-safe, this can lead to multiple instances being created, violating the Singleton pattern.
public class Singleton { private static Singleton instance; private Singleton() {} public static Singleton getInstance() { if (instance == null) { instance = new Singleton(); } return instance; } }
This is not thread-safe because if two threads call getInstance() at the same time when instance is null, they both could pass the if (instance == null) check and create new instances of Singleton.
To make it thread-safe, we can add the synchronized keyword to the getInstance() method:
public class Singleton { private static Singleton instance; private Singleton() {} public static synchronized Singleton getInstance() { if (instance == null) { instance = new Singleton(); } return instance; } }
This is now thread-safe because only one thread can execute the getInstance() method at a time. However, it can be inefficient because once the Singleton instance is created, there's no need to synchronize the method anymore.
 
A more efficient way is to use "double-checked locking", which checks twice if the instance is null, once before synchronization and once within the synchronized block:
public class Singleton { private static volatile Singleton instance; private Singleton() {} public static Singleton getInstance() { if (instance == null) { synchronized (Singleton.class) { if (instance == null) { instance = new Singleton(); } } } return instance; } }
In this version, the getInstance() method is only synchronized when instance is null. This reduces the overhead of acquiring a lock by checking the instance twice.
The volatile keyword guarantees that changes made by one thread to the instance variable are immediately visible to other threads.
This ensures that the Singleton remains thread-safe and also improves performance.

How to prevent thread safety problem

Main cause for thread safety problems

  • multi-threading environment
  • multiple thread accessing same resources
  • perform non-atomic operation to shared resources

How to prevent

  • multi-threading environment
    • Change from multi-threading to single-thread —> add lock to access
  • multiple thread accessing same resources
    • Do not share resources
  • perform non-atomic operation to shared resources
    • change non-atomic operation to atomic —> Use lock or JDK’s own atomic packages

Lock

Different type of locks

  1. Spinlock: When the time to access shared resources is short, and the frequency of context switching is high, it is not worth suspending and resuming threads. JVM implements a method to prevent the thread from being suspended when it does not acquire the lock and instead executes an empty loop. After looping a few times, if the lock is still not obtained, the thread is suspended.
  1. Blocking lock: This type of lock changes the running state of the thread, causing the thread to enter a blocked state and wait. When the corresponding signal is received (either a wake-up signal or a timer signal), the thread can enter a ready state. All threads in the ready state compete to enter the running state.
  1. Reentrant lock: A lock that supports a thread re-entering a lock, similar to having a key to a room that allows multiple entries.
  1. Read-write lock: Two locks, a read lock and a write lock. Write-write operations are mutually exclusive, read-write operations are mutually exclusive, and read-read operations can be shared.
  1. Mutex lock: Similar to going to the bathroom, once you enter the door, you close the door and do not allow others to enter.
  1. Pessimistic lock: Always assumes the worst case scenario. Each time data is accessed, it is assumed that others will modify it, so a lock is applied each time data is accessed. This way, others wanting to access this data will be blocked until they acquire the lock.
  1. Optimistic lock: Each time data is accessed, it is assumed that others will not modify it, so a lock is not applied. However, when updating, it checks whether others have updated this data in the meantime. This can be achieved using mechanisms such as versioning.
  1. Fair lock: Everyone lines up in an orderly manner, which is fair to everyone.
  1. Non-fair lock: Some people are in line, but newcomers may cut in line.
  1. Biased lock: Biased locks use a mechanism that releases the lock only when competition occurs. Therefore, when other threads try to compete for the biased lock, the thread holding the biased lock will release the lock.
  1. Exclusive lock: In the exclusive lock mode, only one thread can hold the lock at a time.
  1. Shared lock: Allows multiple threads to acquire the lock simultaneously, allowing concurrent access to shared resources.

Simple implementation of Reentrant lock

public class SimpleReentrantLock { private Thread owner = null; private int lockCount = 0; public synchronized void lock() throws InterruptedException { Thread currentThread = Thread.currentThread(); while (owner != null && owner != currentThread) { wait(); } owner = currentThread; lockCount++; } public synchronized void unlock() { Thread currentThread = Thread.currentThread(); if (currentThread == owner) { lockCount--; if (lockCount == 0) { owner = null; notify(); } } } public static void main(String[] args) { SimpleReentrantLock lock = new SimpleReentrantLock(); Runnable r = () -> { try { System.out.println("Thread starting: " + Thread.currentThread().getName()); lock.lock(); lock.lock(); // reentrant, should not block System.out.println("Thread inside critical section: " + Thread.currentThread().getName()); Thread.sleep(1000); lock.unlock(); lock.unlock(); // reentrant, should not throw an exception System.out.println("Thread ending: " + Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } }; Thread t1 = new Thread(r); Thread t2 = new Thread(r); t1.start(); t2.start(); } }
In this example, the lock method will block if the lock is held by a different thread, and will immediately return if the lock is not held or is held by the current thread. The unlock method will only release the lock if the current thread is the owner, and will do nothing otherwise.
This is a very simple reentrant lock. Note that this does not handle many edge cases and complexities that a real-world lock would need to handle. For real-world use, you should generally use the ReentrantLock class provided by the Java standard library, which is well-tested and handles all these complexities for you.

Brief Introduction to AQS

AbstractQueuedSynchronizer (often referred to as AQS) is a class provided by Java as part of the java.util.concurrent.locks package. It serves as the base for most of the synchronization and locking utilities in the Java Concurrency API. The class is abstract, meaning that it provides a skeletal implementation for developers to extend and use in their own synchronization classes.
The "queued" in AbstractQueuedSynchronizer refers to the fact that this class uses a FIFO (First-In-First-Out) wait queue to manage threads that are waiting for a state to change (such as a lock to be released). This queue is doubly-linked, which allows the removal of threads from the middle of the queue when they get interrupted or time out.
The "synchronizer" part refers to the fact that this class provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.
Here's a high-level overview of how AQS works:
  1. State: AQS uses an integer to represent the state of the synchronizer (for a lock, this could represent whether it's locked or not). You can change the state using methods like setState(int newState), compareAndSetState(int expect, int update), and getState().
  1. Acquire: When a thread wants to acquire a lock or change the state of the synchronizer, it calls one of the acquire methods (like acquire(int arg)). If the state change isn't immediately available (for example, if the lock is held by another thread), the acquire method adds the thread to the wait queue and parks it.
  1. Release: When a thread is done with the state or lock, it calls one of the release methods (like release(int arg)). The release method will change the state and if there are any waiting threads, it will unpark the first one in the queue.
  1. Condition Objects: AQS also provides a ConditionObject inner class that you can use to implement condition variables, which allow a thread to voluntarily give up a lock until a certain condition is met.
Notable classes that use AQS as their base include ReentrantLock, Semaphore, CountDownLatch, ReentrantReadWriteLock, and FutureTask.

ReentrantReadWriteLock

ReentrantReadWriteLock is a class in Java that provides a pair of associated locks, one for read-only operations and another one for writing. The read lock may be held simultaneously by multiple reader threads, as long as there are no writers. The write lock is exclusive, meaning that it can only be held by one thread at a time and all read and write requests block until the write lock is released.
This lock allows multiple threads to read a resource concurrently, but if a thread wants to write or modify the resource, it must first acquire the write lock. This ensures that the write operation is done atomically, while still allowing high concurrency during read operations.
 
import java.util.concurrent.locks.ReentrantReadWriteLock; public class Counter { private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private int count = 0; // use read lock for getting the value public int getCount() { lock.readLock().lock(); try { return count; } finally { lock.readLock().unlock(); } } // use write lock for updating the value public void incrementCount() { lock.writeLock().lock(); try { count++; } finally { lock.writeLock().unlock(); } } }
Multiple threads can call getCount() concurrently, but if a thread calls incrementCount(), it will have exclusive access until it releases the write lock.
A couple of things to note:
  • ReentrantReadWriteLock is "reentrant" because each lock that it provides, the read lock and the write lock, can be held by the same thread more than once. That is, the thread can acquire the lock, perform some work, and then acquire the lock again (re-enter the lock) if it needs to perform more work. When the thread is done, it must release the lock as many times as it acquired it.
  • Also, ReentrantReadWriteLock provides a fairness policy, which can be either 'fair' or 'unfair'. A fair lock favors granting access to the longest-waiting thread. In contrast, an unfair lock makes no guarantees about the order in which threads are granted access. By default, ReentrantReadWriteLock is unfair, which generally has higher throughput but less predictability.

Thread Communication

Wait, notify, notifyAll

The wait and notify (or notifyAll) methods in Java are used for inter-thread communication, particularly when different threads need to coordinate their actions or when one thread needs to wait for another thread to signal that it has completed a particular task. These methods are part of the Object class in Java and are used in conjunction with synchronized blocks or methods.
public class Shared { private String data; private boolean empty = true; public synchronized void produce(String data) { while (!empty) { try { wait(); } catch (InterruptedException e) { /* handle exception */ } } this.data = data; empty = false; notifyAll(); } public synchronized String consume() { while (empty) { try { wait(); } catch (InterruptedException e) { /* handle exception */ } } empty = true; notifyAll(); return data; } } public class Producer implements Runnable { private Shared shared; public Producer(Shared shared) { this.shared = shared; } @Override public void run() { for (int i = 0; i < 10; i++) { shared.produce("Data-" + i); } } } public class Consumer implements Runnable { private Shared shared; public Consumer(Shared shared) { this.shared = shared; } @Override public void run() { for (int i = 0; i < 10; i++) { System.out.println(shared.consume()); } } } public class Main { public static void main(String[] args) { Shared shared = new Shared(); Thread producerThread = new Thread(new Producer(shared)); Thread consumerThread = new Thread(new Consumer(shared)); producerThread.start(); consumerThread.start(); } }
  • The Shared class represents a shared object that holds the data being passed between the producer and the consumer. It has two methods, produce and consume, which are synchronized to ensure that they can't be called concurrently by different threads.
  • The produce method waits until the shared object is empty, then it produces some data, stores it in the shared object, and calls notifyAll to wake up any threads that are waiting for the shared object to be filled.
  • The consume method waits until the shared object is filled, then it consumes the data, empties the shared object, and calls notifyAll to wake up any threads that are waiting for the shared object to be emptied.
  • The Producer and Consumer classes implement Runnable and represent the producer and consumer threads, respectively. Each thread has a reference to the shared object and repeatedly calls produce or consume.
  • In the main method, a Shared object, a Producer thread and a Consumer thread are created, and the threads are started. The producer and consumer work concurrently, passing data back and forth through the shared object.

Pipelining

Pipelining is a form of parallel processing in which multiple threads operate on a data stream, with each thread responsible for a distinct stage of the data processing. It's essentially a mechanism of processing where the output of one thread becomes the input for the next thread.
PipedInputStream and PipedOutputStream are used in Java for creating a communication pipe between two threads where one thread can write data to the PipedOutputStream, and the other thread can read that data from the PipedInputStream.
import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; public class PipeExample { public static void main(String[] args) throws IOException { final PipedOutputStream output = new PipedOutputStream(); final PipedInputStream input = new PipedInputStream(output); Thread writerThread = new Thread(() -> { try { output.write("Hello, world!".getBytes()); output.close(); } catch (IOException e) { e.printStackTrace(); } }); Thread readerThread = new Thread(() -> { try { int data = input.read(); while (data != -1) { System.out.print((char) data); data = input.read(); } input.close(); } catch (IOException e) { e.printStackTrace(); } }); writerThread.start(); readerThread.start(); } }
  1. We create a PipedOutputStream (output) and a PipedInputStream (input) connected to the output.
  1. The writerThread writes a string into the output stream.
  1. The readerThread reads from the input stream, which is connected to the output stream. It reads byte by byte until it gets -1, which signifies the end of the stream, and prints each byte as a char to the console.
  1. We start both threads. Depending on the scheduler and the timing, you may sometimes see no output because the readerThread starts before writerThread has had a chance to write anything. To avoid this, you could add some kind of synchronization, or you could make readerThread wait for writerThread to complete using Thread.join().

PipedInputSteam and PipedOutputStream Limitations

Using PipedInputStream and PipedOutputStream is a somewhat low-level way of doing inter-thread communication in Java, and there are several potential pitfalls and limitations to be aware of:
  1. Blocking behavior: Both reading and writing to a pipe are blocking operations. If you try to read from a PipedInputStream and there's no data, the thread will block until some data is written. Similarly, if the PipedOutputStream's internal buffer is full (because the data hasn't been read yet), a thread trying to write will block until some data is read and space becomes available in the buffer.
  1. Buffer size limitation: PipedInputStream/PipedOutputStream uses a relatively small fixed-size buffer (the default size is 1KB in Java 8). If you need to pass large amounts of data between threads, or if the producer thread can generate data much faster than the consumer thread can process it, you might run into problems with the buffer filling up.
  1. Need for proper synchronization: These classes require careful synchronization to avoid data corruption or other problems. For example, you should not attempt to use the same PipedInputStream or PipedOutputStream from multiple threads without proper synchronization.
  1. Deadlock risk: There's a risk of deadlock if a thread attempts to read and write the same pipe without proper control over when and where data is being read or written.
  1. IOException on Thread Interruption: A thread blocked on PipedInputStream.read() or PipedOutputStream.write() will throw an IOException if interrupted. So if your code uses interruption to cancel tasks or shut down threads, you'll need to handle this properly.
  1. One-to-one pairing: Each PipedOutputStream must be connected to exactly one PipedInputStream and vice versa. If you need to distribute data from one thread to multiple consumers, or collect data from multiple producers into one thread, you'll need to set up multiple pipes and manage them carefully.
Given these potential issues, for many common cases it might be easier and safer to use higher-level concurrency utilities like BlockingQueue, ExecutorService, or CompletableFuture, which take care of many of these details for you.

Thread.join

thread.join(): This method allows one thread to wait until another thread completes its execution. If t is a Thread object whose thread is currently executing, then t.join() will make sure that t is terminated before the next instruction is executed by the program. If several threads call join() on the same thread, this is still valid, and all threads will block until the thread—all are waiting for—terminates.
Thread thread1 = new Thread(() -> { System.out.println("Start thread1."); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("End thread1."); }); Thread thread2 = new Thread(() -> { System.out.println("Start thread2."); try { thread1.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("End thread2."); }); thread1.start(); thread2.start();
In this code, thread1 sleeps for 2 seconds, and thread2 starts and immediately tries to join with thread1. Because of the join() call, thread2 will wait for thread1 to finish before it prints "End thread2."
 

ThreadLocal

ThreadLocal is a class in Java that allows you to create variables that can only be read and written by the same thread. Even if two threads are executing the same code, and the code has a reference to a ThreadLocal object, the two threads cannot see each other's ThreadLocal variables.
ThreadLocal instances are typically private static fields in classes that wish to associate state with a thread (such as a user ID or transaction ID). Here is a simple example of how to use ThreadLocal:
public class Example { private static ThreadLocal<Integer> threadId = new ThreadLocal<Integer>() { @Override protected Integer initialValue() { return (int) (Math.random() * 100); } }; public static void main(String[] args) { for(int i=0; i<10; i++){ new Thread(() -> { try { System.out.println("Thread's ID: " + threadId.get()); } finally { threadId.remove(); } }).start(); } } }
In this example, each thread gets a unique, randomly generated ID because each call to threadId.get() is confined to the thread that makes the call.
Note that although ThreadLocal can be a powerful tool for associating per-thread state with code that is concurrent, it should be used with care. ThreadLocal variables are common sources of memory leaks in Java web applications if they are not cleaned up after use, because they can keep a reference to their containing thread alive even after the thread has finished executing. This is particularly true in environments where threads are reused, such as thread pools.
To avoid such memory leaks, any ThreadLocal variables should be cleared once they are not needed anymore. This can be done by calling the remove() method on ThreadLocal.

Condition

java.util.concurrent.locks.Condition is used with a Lock to manage concurrent access to a shared resource. A Condition instance provides methods to await (causing the current thread to be disabled for thread scheduling purposes and to lie dormant until it's signaled) and signal (waking up one or more threads that are waiting on this condition).
The primary advantage of Condition over the basic Object.wait and Object.notify methods is the ability to have multiple Condition objects per lock, giving you more granular control over which threads get woken up.
import java.util.LinkedList; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class ProducerConsumerExample { LinkedList<Integer> buffer = new LinkedList<>(); int capacity = 4; ReentrantLock lock = new ReentrantLock(); Condition notEmpty = lock.newCondition(); Condition notFull = lock.newCondition(); // Producer class class Producer implements Runnable { public void run() { try { int value = 0; while (true) { lock.lock(); try { while (buffer.size() == capacity) { notFull.await(); } System.out.println("Producer produced-" + value); buffer.add(value++); notEmpty.signal(); } finally { lock.unlock(); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } // Consumer class class Consumer implements Runnable { public void run() { try { while (true) { lock.lock(); try { while (buffer.isEmpty()) { notEmpty.await(); } int value = buffer.removeFirst(); System.out.println("Consumer consumed-" + value); notFull.signal(); } finally { lock.unlock(); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } public void start() { new Thread(new Producer()).start(); new Thread(new Consumer()).start(); } public static void main(String[] args) { new ProducerConsumerExample().start(); } }
  • The Condition instances notEmpty and notFull are used to manage the state of the buffer.
  • When the buffer is full, the producer calls notFull.await() to wait until space is available. When the consumer removes an item from the buffer, it calls notFull.signal() to notify the producer that space is available.
  • Similarly, when the buffer is empty, the consumer calls notEmpty.await() to wait until an item is available. When the producer adds an item to the buffer, it calls notEmpty.signal() to notify the consumer that an item is available.
This is a classic example of using Condition for inter-thread communication. This example could be further improved by using signalAll() instead of signal() to wake up all waiting threads if the application requires such behavior.

Atomic

Atomic classes in Java, which are part of the java.util.concurrent.atomic package, are used for performing atomic operations, which are operations that are performed as a single unit of work without the possibility of interference from other operations. Atomic operations are thread-safe, which means they can be safely used in multithreaded environments without the need for additional synchronization.
import java.util.concurrent.atomic.AtomicInteger; public class AtomicExample { private AtomicInteger counter = new AtomicInteger(0); public void incrementCounter() { counter.incrementAndGet(); } public int getCounter() { return counter.get(); } }
In this example, counter is an AtomicInteger. The incrementAndGet method atomically increments the current value by one and returns the updated value, which ensures that the increment operation is thread-safe.
 
import java.util.concurrent.atomic.LongAccumulator; public class AccumulatorExample { private LongAccumulator accumulator = new LongAccumulator(Long::max, Long.MIN_VALUE); public void accumulate(long value) { accumulator.accumulate(value); } public long getAccumulator() { return accumulator.get(); } }
In this example, accumulator is a LongAccumulator that is initialized to Long.MIN_VALUE and uses Long::max as the accumulation function. The accumulate method atomically updates the current value with the maximum of the current value and the given value, which ensures that the operation is thread-safe.
Â