Concurrently Programming Challenges
Concurrent programming presents several challenges, mainly stemming from the fact that multiple threads or processes are executing simultaneously, potentially sharing resources and data. Some of the main challenges include:
- Race conditions: When two or more threads access shared data simultaneously, the final value of the data might depend on the order in which the threads are executed. This can lead to unpredictable results and hard-to-reproduce bugs.
- Deadlocks: A situation where two or more threads are stuck waiting for each other to release a resource (e.g., a lock), causing the program to hang indefinitely. Deadlocks usually occur when multiple threads acquire locks in an inconsistent order.
- Livelocks: A livelock is similar to a deadlock, but in this case, threads are not completely blocked; they are just unable to make progress due to the constant change of states. A livelock can occur when two or more threads are stuck in a loop, continuously responding to each other's actions but not making any progress.
- Starvation: A situation where a thread is unable to make progress because it is constantly waiting for a shared resource that is always in use by other threads. Starvation can lead to reduced performance or even application crashes.
- Resource contention: When multiple threads compete for the same limited resources (e.g., memory, CPU time, or I/O bandwidth), it can lead to contention and significantly impact the performance of the application.
- Correctness and consistency: Ensuring that concurrent operations maintain the correctness and consistency of shared data structures can be challenging, as different threads may have different views of the shared state at any given time.
- Complexity: Concurrent programming often introduces additional complexity to the code, making it harder to read, understand, and maintain. Debugging and testing concurrent code can also be more challenging due to the non-deterministic nature of thread execution.
Context switching
In a multitasking operating system, the scheduler manages the execution of multiple
processes
or threads
by allocating time slices
(also called "quantum") to each of them. When the allocated time slice for a process or thread expires
or when a higher-priority task
becomes available, the operating system performs a context switch to save
the current state
of the running process and restore
the state of the next process
to be executed. The saved state includes the values of CPU registers
, program counter
, stack pointer
, and other relevant information.Â
Context switching has some
overhead
, as the operating system needs to save and restore the state of processes or threads. This overhead can impact the performance of the system, especially if the rate of context switching is high. However, context switching is a crucial feature for multitasking systems, as it allows efficient sharing of resources and provides the illusion of concurrent execution of multiple processes or threads.CAS
The Java Atomic package uses the Compare and Swap (CAS) algorithm to sychornize atomic operations particularly in lock-free data structures and algorithms.
- CAS takes three arguments:
memory location
,expected value
andnew value
. The algorithm compare the value at the memory location with the expected value; if theymatch
, the new value is written to the memory location. The entire operation isatomic
, meaning it cannot be interrupted by other threads, ensuringconsistency
andcorrectness
in concurrent environments.
- Here's a high-level description of the CAS algorithm:
- Read the current value at the memory location.
- Compare the current value with the expected value.
- If the values match, update the memory location with the new value.
- Return a boolean indicating whether the operation was successful.
CAS requires no locking mechanism and requires the least threads for the given tasks.
Deadlock Demo
public class DeadLockDemo { private static final Object HAIR_A = new Object(); private static final Object HAIR_B = new Object(); public static void main(String[] args) { new Thread(() -> { synchronized (HAIR_A) { try { Thread.sleep(50L); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (HAIR_B) { System.out.println("A successfully grabbed B's hair"); } } }).start(); new Thread(() -> { synchronized (HAIR_B) { synchronized (HAIR_A) { System.out.println("B successfully grabbed A's hair"); } } }).start(); } }
- Thread 1 acquires the lock on
HAIR_A
by entering thesynchronized (HAIR_A)
block.
- Thread 1 sleeps for 50 milliseconds to simulate some processing time.
- Meanwhile, Thread 2 acquires the lock on
HAIR_B
by entering thesynchronized (HAIR_B)
block.
- Thread 1 wakes up from sleep and tries to acquire the lock on
HAIR_B
by entering thesynchronized (HAIR_B)
block. However, it can't proceed because Thread 2 holds the lock onHAIR_B
.
- At the same time, Thread 2 tries to acquire the lock on
HAIR_A
by entering thesynchronized (HAIR_A)
block. It can't proceed because Thread 1 holds the lock onHAIR_A
.
- As a result, both threads are stuck waiting for each other to release the respective locks, and neither can proceed. This situation leads to a deadlock.
Thread Safety
import java.util.concurrent.CountDownLatch; public class UnSafeThread { private static int num = 0; private static CountDownLatch countDownLatch = new CountDownLatch(10); /** * Increment num by 1 */ public static void increment() { num++; } public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(() -> { for (int j = 0; j < 100; j++) { increment(); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } countDownLatch.countDown(); }).start(); } while (true) { if (countDownLatch.getCount() == 0) { System.out.println(num); break; } } } }
- The
main
method creates 10 threads in a loop.
- Each thread acquires a "virtual" lock on the shared
num
variable by entering theincrement()
method (although no actual synchronization mechanism is used).
- Each thread increments the
num
variable by 1.
- Each thread sleeps for 10 milliseconds to simulate some processing time.
- Meanwhile, other threads are being created and run concurrently, also incrementing the
num
variable and sleeping for 10 milliseconds.
- As the threads access the shared
num
variable without any synchronization,race conditions
occur, causing the final value ofnum
to be unpredictable.
- After each thread completes its execution, it calls the
countDown()
method on theCountDownLatch
.
- The
main
thread is continuously checking the count of theCountDownLatch
in a while loop.
- When the count of the
CountDownLatch
reaches 0, indicating that all threads have completed their execution, themain
thread breaks out of the while loop.
- The
main
thread prints the final value of thenum
variable, which is the result of all threads' concurrent operations.
Basic Thread knowledge
Process vs Thread
Processes and threads are both ways to achieve concurrent execution in operating systems, but they have some important differences:
Process:
A process is an
instance
of a program
that is being executed
. Each process has its own separate memory space
and does not share it with other processes. Because of this, processes are often called "heavyweight
" or costly, as switching between different processes requires some time (a context switch) because the system needs to store and restore memory maps, registers, etc.Processes can
communicate
with each other
, but this communication requires some setup and coordination, as the operating system needs to provide some communication mechanism
such as pipes or sockets.Examples of process-based multitasking include running a web browser, a text editor, and a database system on a computer at the same time. Each of these tasks runs in its own process.
Thread:
A thread, also known as a lightweight process, is a
unit of execution
within a process. All threads within the same process share the same memory space
, which means they can access the same variables and data structures, making communication
between threads very efficient
.However, this shared memory model also necessitates
careful coordination
when accessing shared data, as multiple threads can interfere with each other (a situation known as a race condition). To prevent this, various synchronization
techniques, such as mutexes, semaphores, and condition variables, can be used.Threads are useful when
different tasks
can be performed concurrently
within the same program, like handling multiple connections in a server or updating the user interface while performing background computations in a desktop application.Summary
In summary, the primary difference between a process and a thread is that processes are isolated from each other with their own memory space, while threads share memory space but require synchronization to manage access to shared data.
Â
Thread states
A thread can be in one of the following states during its lifecycle:
- New: The thread is created but not yet started.
- Runnable (or Running): The thread might be running or ready to run at any instant of time. It's the responsibility of the thread scheduler to give the thread time to run.
- Blocked (or Waiting): The thread is alive, but currently not eligible to run due to awaiting some external processing such as file I/O to complete.
- Waiting: A thread is in a blocked state while it waits for another thread to perform a particular action.
- Timed Waiting: A thread is in the timed waiting state due to calling a method with a specified time limit.
- Terminated (or Dead): A thread is in a terminated state when its run() method exits.
stateDiagram [*] --> New: Thread created New --> Runnable: Thread started state fork_state <<fork>> Runnable --> fork_state fork_state --> Running: Scheduler selects thread fork_state --> Blocked: Awaits resource fork_state --> Waiting: Awaits another thread fork_state --> TimedWaiting: Thread sleeps Running --> Dead: run() method completes Dead --> [*] Blocked --> Runnable: Resource available Waiting --> Runnable: Other thread signals TimedWaiting --> Runnable: Sleep time expires or gets interrupted
Â
Creating new thread
public class MyRunnable implements Runnable { @Override public void run() { // code to execute in the new thread for (int i = 0; i < 5; i++) { System.out.println(Thread.currentThread().getId() +" Value "+ i); try { // Sleep for a while Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } public class Main { public static void main(String[] args) { Thread thread1 = new Thread(new MyRunnable()); thread1.start(); // starts the thread Thread thread2 = new Thread(new MyRunnable()); thread2.start(); // starts the second thread } } // Thread pool method public class Main { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(2); // creating a pool of 2 threads Runnable worker = new MyRunnable(); executor.execute(worker); // calling execute method of ExecutorService executor.execute(worker); executor.shutdown(); // shut down executor while (!executor.isTerminated()) { } System.out.println("Finished all threads"); } }
Creating new thread using runnable instead of
extends Thread
as java only allows single inheritance. but you can implement multiple interface
. This is good for code cleanliness
- In the
main
method of theMain
class, we create anExecutorService
with a fixed thread pool of 2 threads usingExecutors.newFixedThreadPool(2)
.
- Then we create an instance of
MyRunnable
, which we pass to theexecute
method of theExecutorService
. This method starts a new thread and calls therun
method ofMyRunnable
in that thread.
- The
shutdown
method is called on theExecutorService
to stop accepting new tasks and to allow previously submitted tasks to complete.
- The
while
loop checks if all tasks have completed execution.
- Once all threads have completed execution, it prints "Finished all threads".
Thread suspension and recover
Thread suspension is a process where a thread's execution is temporarily stopped or paused. The suspended thread can be restarted at a later point in time, a process known as thread recovery or resumption.
However, as of Java SE 11, methods such as
Thread.suspend()
, Thread.resume()
, and Thread.stop()
have been deprecated due to their inherent deadlock risks. A suspended thread that holds shared resources might never release them if it's not resumed, causing deadlocks. Similarly, abruptly stopping threads with Thread.stop()
can leave shared data in an inconsistent state.Despite these methods being deprecated, the concepts of thread suspension and resumption are still applicable in concurrent programming. A common and safe way to suspend and resume a thread in modern Java applications involves using
wait()
, notify()
, or notifyAll()
methods provided by the Object class.A good use case for thread suspension and recovery might be in a
producer-consumer
scenario, where you want the consumer to wait
(suspend) when there's no data
to consume and to continue
(recover) when new data arrives
.public class SharedObject { private String data = null; public synchronized void produce(String data) { while (this.data != null) { try { wait(); // suspend the thread if data already exists } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } this.data = data; notifyAll(); // notify all waiting threads } public synchronized String consume() { while (this.data == null) { try { wait(); // suspend the thread if no data to consume } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } String data = this.data; this.data = null; notifyAll(); // notify all waiting threads return data; } } public class Producer implements Runnable { private final SharedObject sharedObject; public Producer(SharedObject sharedObject) { this.sharedObject = sharedObject; } @Override public void run() { for (int i = 0; i < 10; i++) { sharedObject.produce("Data " + i); } } } public class Consumer implements Runnable { private final SharedObject sharedObject; public Consumer(SharedObject sharedObject) { this.sharedObject = sharedObject; } @Override public void run() { for (int i = 0; i < 10; i++) { System.out.println("Consumed: " + sharedObject.consume()); } } } public class Main { public static void main(String[] args) { SharedObject sharedObject = new SharedObject(); new Thread(new Producer(sharedObject)).start(); new Thread(new Consumer(sharedObject)).start(); } }
The
SharedObject
class has a data
field that's shared between a producer and a consumer. The producer produces data, and the consumer consumes it. If the producer tries to produce data when the previous data hasn't been consumed, it's suspended using the wait()
method. Similarly, if the consumer tries to consume data when there's no data, it's suspended. When data is produced or consumed, all waiting threads are notified using notifyAll()
.Â
Thread Interruption
Thread interruption is a mechanism in Java that allows one thread to signal another thread to stop its current operation and move on to another operation. Essentially, an interrupt is a way to communicate between threads, typically to convey a message like "stop what you're doing and do something else" or simply "stop executing".
A thread sends an interrupt by invoking
interrupt()
on the Thread
object for the thread to be interrupted. For the interrupted thread to respond to the interrupt, the thread must be coded to periodically check the interrupted status by invoking the static method Thread.interrupted()
, which returns true
if an interrupt has been received.An example use case could be a UI application where a user clicks a "Cancel" button to stop a long-running operation. The event handler for the button could interrupt the thread performing the operation.
public class MyRunnable implements Runnable { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { // Long running operation here. System.out.println("Thread is running..."); try { Thread.sleep(1000); // Simulating a long-running operation. } catch (InterruptedException e) { // Thread was interrupted during sleep. Thread.currentThread().interrupt(); // Set the interrupt flag. } } System.out.println("Thread was interrupted and is stopping..."); } } public class Main { public static void main(String[] args) { Thread thread = new Thread(new MyRunnable()); thread.start(); // Start the thread. try { Thread.sleep(5000); // Wait for 5 seconds } catch (InterruptedException e) { Thread.currentThread().interrupt(); } thread.interrupt(); // Interrupt the thread. } }
In this example,
MyRunnable
is a Runnable
that runs a long operation in a loop. It checks the interrupt status of the thread in each iteration of the loop, and breaks out of the loop if the thread was interrupted.In the
main
method, we start the MyRunnable
in a new thread, wait for 5 seconds, and then interrupt the thread. After being interrupted, the thread stops its long-running operation and ends.Thread Priority
In Java, each thread is assigned a priority that affects the order in which it is scheduled to run. Thread priority is an integer value that ranges from
Thread.MIN_PRIORITY
(1) to Thread.MAX_PRIORITY
(10). The default priority, Thread.NORM_PRIORITY
, is 5.When the thread scheduler decides which thread to run next, it generally selects from the runnable threads with the highest priority. If multiple threads have the same priority, the scheduler uses a round-robin strategy, giving each thread a chance to run in turn.
However, the exact behavior of thread priorities can depend on the underlying operating system, and it's not always guaranteed that higher-priority threads will always run before lower-priority ones. Therefore, it's usually best not to rely too much on thread priorities to control the behavior of a program.
A possible use case for thread priorities might be a multimedia application that needs to ensure smooth audio playback. The thread handling the audio output could be given a high priority to help prevent audio glitches.
public class HighPriorityThread extends Thread { public HighPriorityThread(String name) { super(name); setPriority(MAX_PRIORITY); // Set this thread to high priority } @Override public void run() { for (int i = 0; i < 50; i++) { System.out.println(getName() + " is running, count: " + i); } } } public class LowPriorityThread extends Thread { public LowPriorityThread(String name) { super(name); setPriority(MIN_PRIORITY); // Set this thread to low priority } @Override public void run() { for (int i = 0; i < 50; i++) { System.out.println(getName() + " is running, count: " + i); } } } public class Main { public static void main(String[] args) { HighPriorityThread highPriorityThread = new HighPriorityThread("HighPriorityThread"); LowPriorityThread lowPriorityThread = new LowPriorityThread("LowPriorityThread"); lowPriorityThread.start(); highPriorityThread.start(); } }
In this example,
HighPriorityThread
and LowPriorityThread
are both subclasses of Thread
. They each override the run
method to print a message 50 times. The HighPriorityThread
is given a high priority by calling setPriority(MAX_PRIORITY)
, and the LowPriorityThread
is given a low priority by calling setPriority(MIN_PRIORITY)
. In the main
method, the threads are started with start()
. The JVM's thread scheduler generally will give the HighPriorityThread
more chances to run than the LowPriorityThread
.Â
Thread Safety
Thread safety in programming means that a piece of code, method, class, or data structure is designed to function correctly when
accessed
by multiple threads concurrently
. A piece of code is considered thread-safe if it only manipulates shared data structures in a manner that guarantees
safe execution
by multiple threads
at the same time.Take a simple operation
num++
, it may seem like a simple, atomic operation, but it's actually not. This operation involves three separate steps:- Read the current value of
num
.
- Increment the value by one.
- Write the new value back to
num
.
When these operations are conducted in a multi-threaded environment, there's a chance for a race condition — a situation where the behavior of the program depends on the relative timing of events, leading to erroneous results. Here's an example of what could happen with two threads:
- Thread A reads the value of
num
(let's say it's 0).
- Thread B reads the value of
num
(still 0).
- Thread A increments the value and writes it back to
num
(now it's 1).
- Thread B increments the value it read earlier (0) and writes it back to
num
.
Now
num
is 1, even though num++
was executed twice. This is a simple example of a race condition that can occur if num++
is not done in a thread-safe way.To make
num++
thread-safe, we can use synchronization mechanisms provided by the Java language. For example, we can use the synchronized
keyword to ensure that these three operations are atomic, meaning they're executed as a single operation without being interrupted by other threads:public class Counter { private int num = 0; public synchronized void increment() { num++; } }
Synchronized In-depth
The
synchronized
keyword in Java is used to control access to critical sections of code that could cause data inconsistency if accessed by multiple threads concurrently.The
synchronized
keyword can be used in the following ways:- Synchronized Method: When a method is declared with the
synchronized
keyword, it's known as a synchronized method. Only one thread can access a synchronized method at a time. If a thread is in a synchronized method, all other threads that want to call any of the object's synchronized methods have to wait.
public synchronized void increment() { num++; }
- Synchronized Block: Synchronized blocks in Java are marked with the
synchronized
keyword. A synchronized block can be used to mark a specific section of the method as synchronized, rather than the whole method. This can help to improve performance because only the necessary part of the code is synchronized.
public void increment() { synchronized(this) { num++; } }
In this example, the
synchronized
keyword is followed by an object in parentheses. This object is called a monitor object. The code inside the synchronized block is only accessible to a single thread that has obtained the lock on the monitor object.Differences
- Granularity: Synchronized methods synchronize the entire method and prevent multiple threads from executing the method concurrently. On the other hand, synchronized blocks provide more granular control over the code. Only the code present inside the block is synchronized.
- Performance: Since synchronized blocks have a smaller scope compared to synchronized methods, they can lead to better performance in multithreaded applications.
- Flexibility: Synchronized blocks are more flexible than synchronized methods because they allow you to choose which object's lock will be used for synchronization. This way, you can control the level of concurrency in your program more precisely. When you synchronize on
this
, you are using the object's intrinsic lock. But you can also use other objects for synchronization. This can help to avoid unnecessary contention if different synchronized blocks are independent and don't need to be locked by the same object.
Remember, inappropriate use of
synchronized
can cause issues such as deadlocks, so it's important to understand it well and apply it appropriately.Â
Thread and Singleton
Â
The Singleton pattern is a design pattern that restricts the instantiation of a class to a single instance and provides a global point of access to it. In other words, a singleton class is one which only allows a single instance of itself to be created, and usually gives simple access to that instance.
Hunger Singleton
Instantiate on class load, even if it is not used, the resource will still be occupied and wasted.
Thread safe
Lazy Singleton
A lazy initialization in Singleton means that the Singleton instance is not created until it's needed. This can save resources, particularly if the Singleton object is resource-intensive to create.
However, in a multi-threaded environment, a lazy-initialized Singleton can be tricky because multiple threads can potentially try to initialize the instance at the same time. If the Singleton isn't properly thread-safe, this can lead to multiple instances being created, violating the Singleton pattern.
public class Singleton { private static Singleton instance; private Singleton() {} public static Singleton getInstance() { if (instance == null) { instance = new Singleton(); } return instance; } }
This is not thread-safe because if two threads call
getInstance()
at the same time when instance
is null
, they both could pass the if (instance == null)
check and create new instances of Singleton
.To make it thread-safe, we can add the
synchronized
keyword to the getInstance()
method:public class Singleton { private static Singleton instance; private Singleton() {} public static synchronized Singleton getInstance() { if (instance == null) { instance = new Singleton(); } return instance; } }
This is now thread-safe because only one thread can execute the
getInstance()
method at a time. However, it can be inefficient because once the Singleton instance is created, there's no need to synchronize the method anymore.Â
A more efficient way is to use "double-checked locking", which checks twice if the instance is
null
, once before synchronization and once within the synchronized block:public class Singleton { private static volatile Singleton instance; private Singleton() {} public static Singleton getInstance() { if (instance == null) { synchronized (Singleton.class) { if (instance == null) { instance = new Singleton(); } } } return instance; } }
In this version, the
getInstance()
method is only synchronized when instance
is null
. This reduces the overhead of acquiring a lock by checking the instance twice. The
volatile
keyword guarantees that changes made by one thread to the instance
variable are immediately visible to other threads. This ensures that the Singleton remains thread-safe and also improves performance.
How to prevent thread safety problem
Main cause for thread safety problems
- multi-threading environment
- multiple thread accessing same resources
- perform non-atomic operation to shared resources
How to prevent
- multi-threading environment
- Change from multi-threading to single-thread —> add lock to access
- multiple thread accessing same resources
- Do not share resources
- perform non-atomic operation to shared resources
- change non-atomic operation to atomic —> Use lock or JDK’s own atomic packages
Lock
Different type of locks
- Spinlock: When the time to access shared resources is short, and the frequency of context switching is high, it is not worth suspending and resuming threads. JVM implements a method to prevent the thread from being suspended when it does not acquire the lock and instead executes an empty loop. After looping a few times, if the lock is still not obtained, the thread is suspended.
- Blocking lock: This type of lock changes the running state of the thread, causing the thread to enter a blocked state and wait. When the corresponding signal is received (either a wake-up signal or a timer signal), the thread can enter a ready state. All threads in the ready state compete to enter the running state.
- Reentrant lock: A lock that supports a thread re-entering a lock, similar to having a key to a room that allows multiple entries.
- Read-write lock: Two locks, a read lock and a write lock. Write-write operations are mutually exclusive, read-write operations are mutually exclusive, and read-read operations can be shared.
- Mutex lock: Similar to going to the bathroom, once you enter the door, you close the door and do not allow others to enter.
- Pessimistic lock: Always assumes the worst case scenario. Each time data is accessed, it is assumed that others will modify it, so a lock is applied each time data is accessed. This way, others wanting to access this data will be blocked until they acquire the lock.
- Optimistic lock: Each time data is accessed, it is assumed that others will not modify it, so a lock is not applied. However, when updating, it checks whether others have updated this data in the meantime. This can be achieved using mechanisms such as versioning.
- Fair lock: Everyone lines up in an orderly manner, which is fair to everyone.
- Non-fair lock: Some people are in line, but newcomers may cut in line.
- Biased lock: Biased locks use a mechanism that releases the lock only when competition occurs. Therefore, when other threads try to compete for the biased lock, the thread holding the biased lock will release the lock.
- Exclusive lock: In the exclusive lock mode, only one thread can hold the lock at a time.
- Shared lock: Allows multiple threads to acquire the lock simultaneously, allowing concurrent access to shared resources.
Simple implementation of Reentrant lock
public class SimpleReentrantLock { private Thread owner = null; private int lockCount = 0; public synchronized void lock() throws InterruptedException { Thread currentThread = Thread.currentThread(); while (owner != null && owner != currentThread) { wait(); } owner = currentThread; lockCount++; } public synchronized void unlock() { Thread currentThread = Thread.currentThread(); if (currentThread == owner) { lockCount--; if (lockCount == 0) { owner = null; notify(); } } } public static void main(String[] args) { SimpleReentrantLock lock = new SimpleReentrantLock(); Runnable r = () -> { try { System.out.println("Thread starting: " + Thread.currentThread().getName()); lock.lock(); lock.lock(); // reentrant, should not block System.out.println("Thread inside critical section: " + Thread.currentThread().getName()); Thread.sleep(1000); lock.unlock(); lock.unlock(); // reentrant, should not throw an exception System.out.println("Thread ending: " + Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } }; Thread t1 = new Thread(r); Thread t2 = new Thread(r); t1.start(); t2.start(); } }
In this example, the
lock
method will block if the lock is held by a different thread, and will immediately return if the lock is not held or is held by the current thread. The unlock
method will only release the lock if the current thread is the owner, and will do nothing otherwise.This is a very simple reentrant lock. Note that this does not handle many edge cases and complexities that a real-world lock would need to handle. For real-world use, you should generally use the
ReentrantLock
class provided by the Java standard library, which is well-tested and handles all these complexities for you.Brief Introduction to AQS
AbstractQueuedSynchronizer
(often referred to as AQS) is a class provided by Java as part of the java.util.concurrent.locks
package. It serves as the base for most of the synchronization and locking utilities in the Java Concurrency API. The class is abstract, meaning that it provides a skeletal implementation for developers to extend and use in their own synchronization classes.The "queued" in
AbstractQueuedSynchronizer
refers to the fact that this class uses a FIFO (First-In-First-Out) wait queue to manage threads that are waiting for a state to change (such as a lock to be released). This queue is doubly-linked, which allows the removal of threads from the middle of the queue when they get interrupted or time out.The "synchronizer" part refers to the fact that this class provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.
Here's a high-level overview of how AQS works:
- State: AQS uses an integer to represent the state of the synchronizer (for a lock, this could represent whether it's locked or not). You can change the state using methods like
setState(int newState)
,compareAndSetState(int expect, int update)
, andgetState()
.
- Acquire: When a thread wants to acquire a lock or change the state of the synchronizer, it calls one of the
acquire
methods (likeacquire(int arg)
). If the state change isn't immediately available (for example, if the lock is held by another thread), theacquire
method adds the thread to the wait queue and parks it.
- Release: When a thread is done with the state or lock, it calls one of the
release
methods (likerelease(int arg)
). Therelease
method will change the state and if there are any waiting threads, it will unpark the first one in the queue.
- Condition Objects: AQS also provides a
ConditionObject
inner class that you can use to implement condition variables, which allow a thread to voluntarily give up a lock until a certain condition is met.
Notable classes that use AQS as their base include
ReentrantLock
, Semaphore
, CountDownLatch
, ReentrantReadWriteLock
, and FutureTask
.ReentrantReadWriteLock
ReentrantReadWriteLock
is a class in Java that provides a pair of associated locks, one for read-only operations and another one for writing. The read
lock may be held simultaneously
by multiple reader threads, as long as there are no writers. The write
lock is exclusive
, meaning that it can only be held by one thread at a time and all read and write
requests block
until the write lock is released.This lock allows multiple threads to read a resource concurrently, but if a thread wants to write or modify the resource, it must first acquire the write lock. This ensures that the write operation is done atomically, while still allowing high concurrency during read operations.
Â
import java.util.concurrent.locks.ReentrantReadWriteLock; public class Counter { private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private int count = 0; // use read lock for getting the value public int getCount() { lock.readLock().lock(); try { return count; } finally { lock.readLock().unlock(); } } // use write lock for updating the value public void incrementCount() { lock.writeLock().lock(); try { count++; } finally { lock.writeLock().unlock(); } } }
Multiple threads can call
getCount()
concurrently, but if a thread calls incrementCount()
, it will have exclusive access until it releases the write lock.A couple of things to note:
ReentrantReadWriteLock
is "reentrant" because each lock that it provides, the read lock and the write lock, can be held by the same thread more than once. That is, the thread can acquire the lock, perform some work, and then acquire the lock again (re-enter the lock) if it needs to perform more work. When the thread is done, it must release the lock as many times as it acquired it.
- Also,
ReentrantReadWriteLock
provides a fairness policy, which can be either 'fair' or 'unfair'. A fair lock favors granting access to the longest-waiting thread. In contrast, an unfair lock makes no guarantees about the order in which threads are granted access. By default,ReentrantReadWriteLock
is unfair, which generally has higher throughput but less predictability.
Thread Communication
Wait, notify, notifyAll
The
wait
and notify
(or notifyAll
) methods in Java are used for inter-thread communication, particularly when different threads need to coordinate their actions or when one thread needs to wait for another thread to signal that it has completed a particular task. These methods are part of the Object class in Java and are used in conjunction with synchronized
blocks or methods.public class Shared { private String data; private boolean empty = true; public synchronized void produce(String data) { while (!empty) { try { wait(); } catch (InterruptedException e) { /* handle exception */ } } this.data = data; empty = false; notifyAll(); } public synchronized String consume() { while (empty) { try { wait(); } catch (InterruptedException e) { /* handle exception */ } } empty = true; notifyAll(); return data; } } public class Producer implements Runnable { private Shared shared; public Producer(Shared shared) { this.shared = shared; } @Override public void run() { for (int i = 0; i < 10; i++) { shared.produce("Data-" + i); } } } public class Consumer implements Runnable { private Shared shared; public Consumer(Shared shared) { this.shared = shared; } @Override public void run() { for (int i = 0; i < 10; i++) { System.out.println(shared.consume()); } } } public class Main { public static void main(String[] args) { Shared shared = new Shared(); Thread producerThread = new Thread(new Producer(shared)); Thread consumerThread = new Thread(new Consumer(shared)); producerThread.start(); consumerThread.start(); } }
- The
Shared
class represents a shared object that holds the data being passed between the producer and the consumer. It has two methods,produce
andconsume
, which are synchronized to ensure that they can't be called concurrently by different threads.
- The
produce
method waits until the shared object is empty, then it produces some data, stores it in the shared object, and callsnotifyAll
to wake up any threads that are waiting for the shared object to be filled.
- The
consume
method waits until the shared object is filled, then it consumes the data, empties the shared object, and callsnotifyAll
to wake up any threads that are waiting for the shared object to be emptied.
- The
Producer
andConsumer
classes implementRunnable
and represent the producer and consumer threads, respectively. Each thread has a reference to the shared object and repeatedly callsproduce
orconsume
.
- In the
main
method, aShared
object, aProducer
thread and aConsumer
thread are created, and the threads are started. The producer and consumer work concurrently, passing data back and forth through the shared object.
Pipelining
Pipelining is a form of parallel processing in which multiple threads operate on a data stream, with each thread responsible for a distinct stage of the data processing. It's essentially a mechanism of processing where the output of one thread becomes the input for the next thread.
PipedInputStream
and PipedOutputStream
are used in Java for creating a communication pipe between two threads where one thread can write data to the PipedOutputStream
, and the other thread can read that data from the PipedInputStream
.import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; public class PipeExample { public static void main(String[] args) throws IOException { final PipedOutputStream output = new PipedOutputStream(); final PipedInputStream input = new PipedInputStream(output); Thread writerThread = new Thread(() -> { try { output.write("Hello, world!".getBytes()); output.close(); } catch (IOException e) { e.printStackTrace(); } }); Thread readerThread = new Thread(() -> { try { int data = input.read(); while (data != -1) { System.out.print((char) data); data = input.read(); } input.close(); } catch (IOException e) { e.printStackTrace(); } }); writerThread.start(); readerThread.start(); } }
- We create a
PipedOutputStream
(output
) and aPipedInputStream
(input
) connected to theoutput
.
- The
writerThread
writes a string into theoutput
stream.
- The
readerThread
reads from theinput
stream, which is connected to theoutput
stream. It reads byte by byte until it gets-1
, which signifies the end of the stream, and prints each byte as a char to the console.
- We start both threads. Depending on the scheduler and the timing, you may sometimes see no output because the
readerThread
starts beforewriterThread
has had a chance to write anything. To avoid this, you could add some kind of synchronization, or you could makereaderThread
wait forwriterThread
to complete usingThread.join()
.
PipedInputSteam and PipedOutputStream Limitations
Using
PipedInputStream
and PipedOutputStream
is a somewhat low-level way of doing inter-thread communication in Java, and there are several potential pitfalls and limitations to be aware of:- Blocking behavior: Both reading and writing to a pipe are blocking operations. If you try to read from a
PipedInputStream
and there's no data, the thread will block until some data is written. Similarly, if thePipedOutputStream
's internal buffer is full (because the data hasn't been read yet), a thread trying to write will block until some data is read and space becomes available in the buffer.
- Buffer size limitation:
PipedInputStream
/PipedOutputStream
uses a relatively small fixed-size buffer (the default size is 1KB in Java 8). If you need to pass large amounts of data between threads, or if the producer thread can generate data much faster than the consumer thread can process it, you might run into problems with the buffer filling up.
- Need for proper synchronization: These classes require careful synchronization to avoid data corruption or other problems. For example, you should not attempt to use the same
PipedInputStream
orPipedOutputStream
from multiple threads without proper synchronization.
- Deadlock risk: There's a risk of deadlock if a thread attempts to read and write the same pipe without proper control over when and where data is being read or written.
- IOException on Thread Interruption: A thread blocked on
PipedInputStream.read()
orPipedOutputStream.write()
will throw anIOException
if interrupted. So if your code uses interruption to cancel tasks or shut down threads, you'll need to handle this properly.
- One-to-one pairing: Each
PipedOutputStream
must be connected to exactly onePipedInputStream
and vice versa. If you need to distribute data from one thread to multiple consumers, or collect data from multiple producers into one thread, you'll need to set up multiple pipes and manage them carefully.
Given these potential issues, for many common cases it might be easier and safer to use higher-level concurrency utilities like
BlockingQueue
, ExecutorService
, or CompletableFuture
, which take care of many of these details for you.Thread.join
thread.join(): This method allows one thread to wait until another thread completes its execution. If
t
is a Thread object whose thread is currently executing, then t.join()
will make sure that t
is terminated before the next instruction is executed by the program. If several threads call join()
on the same thread, this is still valid, and all threads will block until the thread—all are waiting for—terminates.Thread thread1 = new Thread(() -> { System.out.println("Start thread1."); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("End thread1."); }); Thread thread2 = new Thread(() -> { System.out.println("Start thread2."); try { thread1.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("End thread2."); }); thread1.start(); thread2.start();
In this code, thread1 sleeps for 2 seconds, and thread2 starts and immediately tries to join with thread1. Because of the
join()
call, thread2 will wait for thread1 to finish before it prints "End thread2."Â
ThreadLocal
ThreadLocal
is a class in Java that allows you to create variables that can only be read and written by the same thread. Even if two threads are executing the same code, and the code has a reference to a ThreadLocal
object, the two threads cannot see each other's ThreadLocal
variables.ThreadLocal
instances are typically private static fields in classes that wish to associate state with a thread (such as a user ID or transaction ID). Here is a simple example of how to use ThreadLocal
:public class Example { private static ThreadLocal<Integer> threadId = new ThreadLocal<Integer>() { @Override protected Integer initialValue() { return (int) (Math.random() * 100); } }; public static void main(String[] args) { for(int i=0; i<10; i++){ new Thread(() -> { try { System.out.println("Thread's ID: " + threadId.get()); } finally { threadId.remove(); } }).start(); } } }
In this example, each thread gets a unique, randomly generated ID because each call to
threadId.get()
is confined to the thread that makes the call.Note that although
ThreadLocal
can be a powerful tool for associating per-thread state with code that is concurrent, it should be used with care. ThreadLocal
variables are common sources of memory leaks in Java web applications if they are not cleaned up after use, because they can keep a reference to their containing thread alive even after the thread has finished executing. This is particularly true in environments where threads are reused, such as thread pools.To avoid such memory leaks, any
ThreadLocal
variables should be cleared once they are not needed anymore. This can be done by calling the remove()
method on ThreadLocal
.Condition
java.util.concurrent.locks.Condition
is used with a Lock
to manage concurrent access to a shared resource. A Condition
instance provides methods to await
(causing the current thread to be disabled for thread scheduling purposes and to lie dormant until it's signaled) and signal
(waking up one or more threads that are waiting on this condition).The primary advantage of
Condition
over the basic Object.wait
and Object.notify
methods is the ability to have multiple Condition
objects per lock, giving you more granular control over which threads get woken up.import java.util.LinkedList; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class ProducerConsumerExample { LinkedList<Integer> buffer = new LinkedList<>(); int capacity = 4; ReentrantLock lock = new ReentrantLock(); Condition notEmpty = lock.newCondition(); Condition notFull = lock.newCondition(); // Producer class class Producer implements Runnable { public void run() { try { int value = 0; while (true) { lock.lock(); try { while (buffer.size() == capacity) { notFull.await(); } System.out.println("Producer produced-" + value); buffer.add(value++); notEmpty.signal(); } finally { lock.unlock(); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } // Consumer class class Consumer implements Runnable { public void run() { try { while (true) { lock.lock(); try { while (buffer.isEmpty()) { notEmpty.await(); } int value = buffer.removeFirst(); System.out.println("Consumer consumed-" + value); notFull.signal(); } finally { lock.unlock(); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } public void start() { new Thread(new Producer()).start(); new Thread(new Consumer()).start(); } public static void main(String[] args) { new ProducerConsumerExample().start(); } }
- The
Condition
instancesnotEmpty
andnotFull
are used to manage the state of the buffer.
- When the buffer is full, the producer calls
notFull.await()
to wait until space is available. When the consumer removes an item from the buffer, it callsnotFull.signal()
to notify the producer that space is available.
- Similarly, when the buffer is empty, the consumer calls
notEmpty.await()
to wait until an item is available. When the producer adds an item to the buffer, it callsnotEmpty.signal()
to notify the consumer that an item is available.
This is a classic example of using
Condition
for inter-thread communication. This example could be further improved by using signalAll()
instead of signal()
to wake up all waiting threads if the application requires such behavior.Atomic
Atomic classes in Java, which are part of the
java.util.concurrent.atomic
package, are used for performing atomic operations, which are operations that are performed as a single unit of work without the possibility of interference from other operations. Atomic operations are thread-safe, which means they can be safely used in multithreaded environments without the need for additional synchronization.import java.util.concurrent.atomic.AtomicInteger; public class AtomicExample { private AtomicInteger counter = new AtomicInteger(0); public void incrementCounter() { counter.incrementAndGet(); } public int getCounter() { return counter.get(); } }
In this example,
counter
is an AtomicInteger
. The incrementAndGet
method atomically increments the current value by one and returns the updated value, which ensures that the increment operation is thread-safe.Â
import java.util.concurrent.atomic.LongAccumulator; public class AccumulatorExample { private LongAccumulator accumulator = new LongAccumulator(Long::max, Long.MIN_VALUE); public void accumulate(long value) { accumulator.accumulate(value); } public long getAccumulator() { return accumulator.get(); } }
In this example,
accumulator
is a LongAccumulator
that is initialized to Long.MIN_VALUE
and uses Long::max
as the accumulation function. The accumulate
method atomically updates the current value with the maximum of the current value and the given value, which ensures that the operation is thread-safe.Â