Implementing Blocking Queue(LLD)
Below is a low-level design for a blocking queue in Java, implemented without using built-in concurrent utilities like BlockingQueue. This design uses the concepts of thread synchronization (wait() an
A blocking queue is a thread-safe data structure that supports operations to add and remove elements, where these operations may block under certain conditions. It is commonly used in multithreaded programming to coordinate the exchange of data between producer and consumer threads.
Key Characteristics of Blocking Queue
Thread Safety:
Blocking queues are inherently thread-safe, meaning multiple threads can safely access them without external synchronization.
Blocking Behavior:
put()
operation: Blocks if the queue is full until space becomes available.take()
operation: Blocks if the queue is empty until an element is available.
Bounded or Unbounded:
A blocking queue can have a fixed capacity (bounded) or grow indefinitely (unbounded).
Producer-Consumer Model:
Blocking queues are ideal for implementing the producer-consumer pattern, where producers add items to the queue and consumers remove them.
How Does a Blocking Queue Work?
When Full: If a producer tries to insert an item into a full queue, it will block until a consumer removes an item, freeing up space.
When Empty: If a consumer tries to take an item from an empty queue, it will block until a producer inserts an item.
This blocking behavior ensures smooth coordination between producers and consumers.
Common Implementations in Java
Java provides several built-in blocking queue implementations in the java.util.concurrent
package, such as:
ArrayBlockingQueue: A fixed-capacity blocking queue backed by an array.
LinkedBlockingQueue: An optionally bounded queue backed by linked nodes.
PriorityBlockingQueue: An unbounded blocking queue that orders elements according to their priority.
DelayQueue: A time-based blocking queue where elements are delayed for a specified duration before being accessible.
SynchronousQueue: A queue with no capacity, where each
put()
must wait for atake()
and vice versa.
Advantages of Blocking Queue
Simplifies Thread Synchronization:
Automatically handles thread-safe interactions between producers and consumers.
Built-in Blocking:
Removes the need to manually implement complex locking and waiting mechanisms.
Flexible Design:
Supports both bounded and unbounded configurations, suiting various application needs.
Common Use Cases
Producer-Consumer Model:
For systems where one or more threads produce data, and other threads consume it.
Task Scheduling:
Storing and distributing tasks among worker threads.
Logging Systems:
A logger can queue log messages and write them asynchronously to a file or external system.
Message Queues:
Used in messaging systems or pipelines for decoupling producers and consumers.
In essence, a blocking queue provides an elegant solution for coordinating data exchange between threads, ensuring smooth, thread-safe communication with built-in blocking mechanisms.
import java.util.LinkedList;
public class BlockingQueue<T> {
private final LinkedList<T> queue; // Internal queue to store items
private final int capacity; // Maximum size of the queue
public BlockingQueue(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException("Queue capacity must be greater than 0");
}
this.capacity = capacity;
this.queue = new LinkedList<>();
}
// Method to add an item to the queue
public synchronized void put(T item) throws InterruptedException {
// Wait until there is space in the queue
while (queue.size() == capacity) {
wait(); // Release the lock and wait to be notified
}
// Add item to the queue
queue.addLast(item);
// Notify waiting threads that an item has been added
notifyAll();
}
// Method to retrieve and remove an item from the queue
public synchronized T take() throws InterruptedException {
// Wait until there is at least one item in the queue
while (queue.isEmpty()) {
wait(); // Release the lock and wait to be notified
}
// Remove the item from the front of the queue
T item = queue.removeFirst();
// Notify waiting threads that space is now available
notifyAll();
return item;
}
// Method to get the current size of the queue
public synchronized int size() {
return queue.size();
}
}
How It Works
put(T item)
:Checks if the queue is full (
queue.size() == capacity
).If full, the calling thread is blocked using
wait()
.When space becomes available, the item is added, and waiting threads are notified using
notifyAll()
.
take()
:Checks if the queue is empty (
queue.isEmpty()
).If empty, the calling thread is blocked using
wait()
.When an item becomes available, it is removed from the queue, and waiting threads are notified.
Thread-Safe Operations:
All methods are synchronized to ensure that only one thread manipulates the queue at a time.
The
wait()
andnotifyAll()
methods are used to manage blocking and signaling between producer and consumer threads.
For advance one can use lock and conditions for fine control
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class AdvancedBlockingQueue<T> {
private final LinkedList<T> queue; // Internal queue to store items
private final int capacity; // Maximum size of the queue
private final Lock lock; // Lock for thread synchronization
private final Condition notFull; // Condition to signal when queue is not full
private final Condition notEmpty; // Condition to signal when queue is not empty
public AdvancedBlockingQueue(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException("Queue capacity must be greater than 0");
}
this.capacity = capacity;
this.queue = new LinkedList<>();
this.lock = new ReentrantLock();
this.notFull = lock.newCondition();
this.notEmpty = lock.newCondition();
}
// Method to add an item to the queue
public void put(T item) throws InterruptedException {
lock.lock(); // Acquire the lock
try {
// Wait until there is space in the queue
while (queue.size() == capacity) {
notFull.await(); // Wait for the notFull condition
}
// Add item to the queue
queue.addLast(item);
// Signal that the queue is not empty
notEmpty.signal();
} finally {
lock.unlock(); // Release the lock
}
}
// Method to retrieve and remove an item from the queue
public T take() throws InterruptedException {
lock.lock(); // Acquire the lock
try {
// Wait until there is at least one item in the queue
while (queue.isEmpty()) {
notEmpty.await(); // Wait for the notEmpty condition
}
// Remove the item from the front of the queue
T item = queue.removeFirst();
// Signal that the queue is not full
notFull.signal();
return item;
} finally {
lock.unlock(); // Release the lock
}
}
// Method to get the current size of the queue
public int size() {
lock.lock(); // Acquire the lock
try {
return queue.size();
} finally {
lock.unlock(); // Release the lock
}
}
}
Key Advantages of Using Locks and Conditions
Fine-Grained Control:
With
Condition
, you can control specific conditions (notFull
andnotEmpty
), avoiding unnecessary wake-ups that happen withnotifyAll()
.
Better Scalability:
Using
ReentrantLock
avoids contention compared to synchronized methods, especially in systems with high concurrency.
Improved Readability:
Separating conditions like
notFull
andnotEmpty
makes the code easier to understand and extend.
How It Works
put()
:Acquires the lock.
Waits if the queue is full using
notFull.await()
.Adds the item to the queue and signals the
notEmpty
condition.
take()
:Acquires the lock.
Waits if the queue is empty using
notEmpty.await()
.Removes the item from the queue and signals the
notFull
condition.
size()
:Acquires the lock to ensure thread-safe access to the current size.