Question
Design Approach:
Customers post queries on the portal.
Advisors have schedules, including their next available time.
The system assigns queries to advisors based on their availability.
A priority queue ensures queries are assigned in a first-come, first-serve manner.
Multithreading (optional) can be used to simulate scheduling.
Technology Stack:
Java (Concurrency, Priority Queue)
Data structures:
PriorityQueue
,ConcurrentHashMap
Scheduling logic using ExecutorService
How It Works
Advisors are stored in a PriorityQueue, sorted by their
nextAvailableTime
.A BlockingQueue holds customer queries.
A background thread processes queries and assigns them to the next available advisor.
Each advisor gets scheduled for 30 minutes per query.
The assigned advisor is reinserted into the queue after updating availability.
Possible Enhancements
Database Storage: Store advisors' schedules persistently.
Concurrency Improvements: Implement
CompletableFuture
for real-time scheduling.Load Balancing: Consider dynamically adjusting advisor schedules.
The scheduler in the EventScheduler
class is an ExecutorService that continuously processes incoming queries in the background. It ensures that tasks are scheduled asynchronously and assigned to advisors efficiently without blocking the main thread.
Key Uses of the Scheduler
Background Processing
The scheduler runs a separate thread that keeps checking for new customer queries.
This prevents blocking the main application thread.
Asynchronous Query Handling
The
processQueries()
method runs in an independent thread, ensuring queries are assigned to advisors without delays.Without the scheduler, queries would have to be assigned synchronously, causing unnecessary waiting.
Efficient Task Scheduling
The scheduler picks up queries one by one, checks for the next available advisor, and assigns the task accordingly.
It also retries unassigned queries if no advisor is currently available.
Graceful Shutdown Management
The scheduler can be shut down cleanly using
scheduler.shutdown()
, ensuring no tasks are left unprocessed when stopping the application.
What Would Happen Without a Scheduler?
The main thread would block while processing queries.
Queries might not be assigned efficiently, leading to delays.
Advisors might not get reassigned properly after finishing tasks.
Alternative: Using a ScheduledExecutorService
Instead of Executors.newSingleThreadExecutor()
, we could use a ScheduledExecutorService to run the task at fixed intervals, checking for queries periodically.
Code
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.*;
class Advisor implements Comparable<Advisor> {
private final int id;
private final String name;
private LocalDateTime nextAvailableTime;
public Advisor(int id, String name) {
this.id = id;
this.name = name;
this.nextAvailableTime = null; // Initially, no task assigned
}
public int getId() {
return id;
}
public String getName() {
return name;
}
public LocalDateTime getNextAvailableTime() {
return nextAvailableTime;
}
public boolean isAvailable() {
return nextAvailableTime == null || nextAvailableTime.isBefore(LocalDateTime.now());
}
public void assignTask(int processingTimeMinutes) {
this.nextAvailableTime = LocalDateTime.now().plusMinutes(processingTimeMinutes);
}
@Override
public int compareTo(Advisor other) {
return this.nextAvailableTime == null ? -1 :
other.nextAvailableTime == null ? 1 :
this.nextAvailableTime.compareTo(other.nextAvailableTime);
}
}
class CustomerQuery {
private static int counter = 0;
private final int id;
private final String issue;
private final LocalDateTime createdAt;
private final int processingTime; // Variable processing time for each query
public CustomerQuery(String issue) {
this.id = ++counter;
this.issue = issue;
this.createdAt = LocalDateTime.now();
this.processingTime = new Random().nextInt(20) + 10; // Random processing time (10-30 min)
}
public int getId() {
return id;
}
public String getIssue() {
return issue;
}
public LocalDateTime getCreatedAt() {
return createdAt;
}
public int getProcessingTime() {
return processingTime;
}
@Override
public String toString() {
return "Query{" + "id=" + id + ", issue='" + issue + '\'' +
", createdAt=" + createdAt + ", processingTime=" + processingTime + " mins}";
}
}
class EventScheduler {
private final PriorityQueue<Advisor> advisorQueue = new PriorityQueue<>();
private final BlockingQueue<CustomerQuery> queryQueue = new LinkedBlockingQueue<>();
private final ExecutorService scheduler = Executors.newSingleThreadExecutor();
public EventScheduler(List<Advisor> advisors) {
advisorQueue.addAll(advisors);
scheduler.submit(this::processQueries);
}
public void addQuery(CustomerQuery query) {
queryQueue.add(query);
}
private void processQueries() {
while (true) {
try {
CustomerQuery query = queryQueue.take();
Advisor advisor = advisorQueue.poll();
if (advisor != null && advisor.isAvailable()) {
System.out.println("Assigning " + query + " to Advisor: " + advisor.getName());
// Assign task with variable processing time
advisor.assignTask(query.getProcessingTime());
// Reinsert advisor back to queue
advisorQueue.add(advisor);
} else {
// If no available advisor, retry later
if (advisor != null) {
advisorQueue.add(advisor);
}
System.out.println("No available advisor. Retrying...");
Thread.sleep(1000);
queryQueue.add(query);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
public void shutdown() {
scheduler.shutdown();
}
}
public class EventSchedulerApp {
public static void main(String[] args) {
List<Advisor> advisors = Arrays.asList(
new Advisor(1, "Alice"),
new Advisor(2, "Bob"),
new Advisor(3, "Charlie")
);
EventScheduler scheduler = new EventScheduler(advisors);
scheduler.addQuery(new CustomerQuery("Issue 1"));
scheduler.addQuery(new CustomerQuery("Issue 2"));
scheduler.addQuery(new CustomerQuery("Issue 3"));
scheduler.addQuery(new CustomerQuery("Issue 4"));
scheduler.addQuery(new CustomerQuery("Issue 5"));
try {
Thread.sleep(10000); // Simulate runtime
} catch (InterruptedException e) {
e.printStackTrace();
}
scheduler.shutdown();
}
}
What is the Main Thread?
In Java, when a program starts, it begins execution in a single thread called the main thread. This thread is responsible for executing the main
method of the program.
public class Main {
public static void main(String[] args) {
System.out.println("Hello, World!");
}
}
How Does the Main Thread Work?
The JVM (Java Virtual Machine) creates the main thread automatically when a Java program starts.
The main thread executes all sequential operations unless new threads are explicitly created.
If the main thread is blocked, the entire program might hang unless other threads exist.
Why Do We Need Additional Threads?
If a program is running in a single main thread, tasks will execute one after another, meaning:
If a long-running task (e.g., processing queries) blocks the main thread, the entire program will stop responding.
If we want to handle multiple tasks concurrently, we need additional threads.
Main Thread in Our Event Scheduler
In your EventSchedulerApp
, the main thread:
Starts the application (
main
method).Creates advisors and initializes the scheduler.
Submits queries to the scheduler.
Sleeps for 5 seconds (to let queries be processed).
Calls
shutdown()
to stop the scheduler.
However, the EventScheduler
runs in a separate background thread using:
private final ExecutorService scheduler = Executors.newSingleThreadExecutor();
What Happens Without Multi-Threading?
If we don’t use a scheduler (executor service), the processQueries()
method would block the main thread, preventing the program from executing anything else.
Example (Without Multi-threading):
public void processQueries() {
while (true) {
CustomerQuery query = queryQueue.poll();
if (query != null) {
System.out.println("Processing: " + query.getIssue());
try {
Thread.sleep(5000); // Simulate query processing (Blocking)
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
The main thread gets stuck inside this loop.
The program cannot accept new queries or perform any other operations.
This allows:
The main thread to continue execution without being blocked.
The
processQueries()
method to run asynchronously in a separate thread.
Let's explore advanced threading techniques in Java, focusing on ThreadPoolExecutor
and ScheduledExecutorService
1️⃣ ThreadPoolExecutor (Advanced Thread Pool)
🔹 What is it?
ThreadPoolExecutor
is a powerful thread pool manager that lets you control:
Number of threads (core and max)
Task queue size
Thread keep-alive time
Rejection policies (handling overflowed tasks)
🔹 When to Use?
Use ThreadPoolExecutor
when: ✅ You need dynamic control over threads
✅ You want to reuse threads efficiently
✅ You need to handle high throughput tasks
✅ Example: Using ThreadPoolExecutor in Our Event Scheduler
import java.time.LocalDateTime;
import java.util.concurrent.*;
class EventScheduler {
private final PriorityQueue<Advisor> advisorQueue = new PriorityQueue<>();
private final BlockingQueue<CustomerQuery> queryQueue = new LinkedBlockingQueue<>();
// Use ThreadPoolExecutor with 2-5 worker threads
private final ExecutorService scheduler = new ThreadPoolExecutor(
2, // Core threads
5, // Max threads
1, TimeUnit.MINUTES, // Keep-alive time
new LinkedBlockingQueue<>(10) // Queue size of 10 tasks
);
public EventScheduler() {
scheduler.submit(this::processQueries);
}
public void addQuery(CustomerQuery query) {
queryQueue.add(query);
}
private void processQueries() {
while (true) {
try {
CustomerQuery query = queryQueue.take();
Advisor advisor = advisorQueue.poll();
if (advisor != null) {
System.out.println("Assigning " + query + " to Advisor: " + advisor.getName());
// Process query in a separate worker thread
scheduler.submit(() -> handleQuery(query, advisor));
} else {
System.out.println("No available advisor. Retrying...");
Thread.sleep(1000);
queryQueue.add(query);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
private void handleQuery(CustomerQuery query, Advisor advisor) {
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(2000, 5000)); // Simulate different processing times
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Completed: " + query + " by Advisor: " + advisor.getName());
// Update advisor availability
advisor.setNextAvailableTime(LocalDateTime.now().plusMinutes(30));
advisorQueue.add(advisor);
}
public void shutdown() {
scheduler.shutdown();
}
}
🔹 Why is this better than a single-threaded executor?
✅ Supports multiple queries at the same time
✅ Allows for dynamic scalability
✅ Handles high loads better
✅ Efficient task queuing
2️⃣ ScheduledExecutorService (For Delayed & Repeated Tasks)
🔹 What is it?
It schedules tasks at fixed rates or after delays.
Useful for periodic tasks (e.g., retrying failed queries, health checks, polling).
import java.util.concurrent.*;
class ScheduledQueryProcessor {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
public void startProcessing() {
scheduler.scheduleAtFixedRate(() -> {
System.out.println("Checking for new queries...");
// Simulate query processing
}, 0, 5, TimeUnit.SECONDS); // Runs every 5 seconds
}
public void shutdown() {
scheduler.shutdown();
}
}
When to Use?
✅ Background tasks (e.g., cleaning up old data)
✅ Retry mechanisms (e.g., failed queries)
✅ Periodic health checks
💡 Summary
Use
ThreadPoolExecutor
when handling multiple queries concurrently.Use
ScheduledExecutorService
for periodic jobs like retries, monitoring, or cleanup.Both improve efficiency, scalability, and responsiveness compared to a single-threaded scheduler.