Design and implement a Call Center
Author: Mohammad J Iqbal
Follow Mohammad J Iqbal on LinkedIn
Constraints and assumptions
-
There are three levels of employees:
a) Operator b) Supervisor c) Director
-
Assume operators always gets the initial calls, if operator can handle the call, the call is marked completed, if not the call is escalated to a Supervisor, If Supervisor can’t handle the call, the call is escalated to a Director. Director can’t escalate the call and must resolve the issue.
-
If a call is intended to a level and no one on that level is available, the call is queued on the respective call queue.
Extra credit:
-
Thread Safety
-
Keeping the ability so that a customer can connect to a supervisor or director skipping lower levels.
-
Implement functionality that will check all call Queues after certain interval and assign call to an employee based on employee level.
If watching the topic on video more convenient to you, you can watch it on YouTube below:
Let’s identify the classes
-
CallCenter (Main/Driver Class)
-
Call
-
Caller
-
Employee(CallHandler) -> Operator, Supervisor, Director
Class Caller:
public class Caller {
String callerId;
private volatile boolean isIssueResolved;
public Caller(String callerId) {
this.callerId = callerId;
}
public void setIssueResolved(boolean flag) {
this.isIssueResolved = flag;
}
public boolean isIssueResolved() {
...
}
}
Class Call:
public class Call {
public enum CallState{
READY, IN_PROGRESS, COMPLETE
}
/* Minimal rank of employee who can handle this call */
Employee.Rank rank;
Caller caller;
Employee handler;
CallState callState;
public Call(Caller c, Employee.Rank rank){
this.caller = c;
this.rank = rank;
this.callState = Call.CallState.READY;
}
public void setHandler(Employee handler) {
this.handler = handler;
}
}
Class Employee:
public abstract class Employee {
String empId;
String name;
public Rank rank;
volatile Call currentCall;
CallCenter callCenter;
private final AtomicBoolean isFree = new AtomicBoolean(true);
public void setCallCenter(CallCenter callCenter) {
this.callCenter = callCenter;
}
public enum Rank {
OPERATOR, SUPERVISOR, DIRECTOR
}
public Employee(String empId, String name, Rank rank) {
this.empId = empId;
this.name = name;
this.rank = rank;
}
abstract void escalateCall();
public void receiveCall(Call call) {
...
}
public void completeCall() {
...
}
public boolean isFree() {
return isFree.get() && currentCall == null;
}
boolean assignNewCall(Call call) {
...
}
public void setFree(boolean free) {
...
}
private void processCall(Call call, Employee employee) {
...
}
}
Classes: Operator, Supervisor, Director
public class Operator extends Employee {
public Operator(String empId, String name) {
super(empId, name, Rank.OPERATOR);
}
@Override
void escalateCall() {
...
}
}
public class Supervisor extends Employee {
public Supervisor(String empId, String name) {
super(empId, name, Rank.SUPERVISOR);
}
@Override
void escalateCall() {
...
}
}
public class Director extends Employee {
public Director(String empId, String name){
super(empId,name, Employee.Rank.DIRECTOR);
}
@Override
void escalateCall() {
...
}
}
Class: CallCenter
public class CallCenter {
String name;
private final Map<Employee.Rank, Queue<Call>> callQueues = new ConcurrentHashMap<>();
private final Map<Employee.Rank, List<Employee>> allEmployees = new ConcurrentHashMap<>();
private final Lock lock = new ReentrantLock();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
Runnable assignCallFromQueue = this::checkAvailabilityAndAssign;
public CallCenter(String name, List<Employee> operators, List<Employee> supervisors, List<Employee> directors) {
callQueues.put(Employee.Rank.OPERATOR, new LinkedList<>());
callQueues.put(Employee.Rank.SUPERVISOR, new LinkedList<>());
callQueues.put(Employee.Rank.DIRECTOR, new LinkedList<>());
// Associate all employee to this call center
operators.forEach(o -> o.setCallCenter(this));
supervisors.forEach(s -> s.setCallCenter(this));
directors.forEach(d -> d.setCallCenter(this));
allEmployees.put(Employee.Rank.OPERATOR, operators);
allEmployees.put(Employee.Rank.SUPERVISOR, supervisors);
allEmployees.put(Employee.Rank.DIRECTOR, directors);
// Schedule assignCallFromQueue
scheduler.scheduleAtFixedRate(assignCallFromQueue, 5, 10, SECONDS);
}
// Adds a call to the respective call queue by call handler's rank
public void addCallToQueue(Call call) {
...
}
public void handleCall(Call call) {
...
}
private void dispatchCall(Call call, Optional<Employee> employeeOptional) {
...
}
private Optional<Employee> findAvailableEmployee(Employee.Rank rank) {
...
}
private void checkAvailabilityAndAssign() {
...
}
}
Implementation:
Now that we are done with design, we can deep dive into the implementation with detail functionalities:
Class: Caller
package com.spsoft.callcenter;
import java.util.concurrent.ThreadLocalRandom;
public class Caller {
String callerId;
private volatile boolean isIssueResolved;
public Caller(String callerId) {
this.callerId = callerId;
}
public void setIssueResolved(boolean flag) {
this.isIssueResolved = flag;
}
public boolean isIssueResolved() {
isIssueResolved = ThreadLocalRandom.current().nextDouble() < 0.5;
return isIssueResolved;
}
}
Class: Call
package com.spsoft.callcenter;
public class Call {
public enum CallState{
READY, IN_PROGRESS, COMPLETE
}
/* Minimal rank of employee who can handle this call */
Employee.Rank rank;
Caller caller;
Employee handler;
CallState callState;
public Call(Caller c, Employee.Rank rank){
this.caller = c;
this.rank = rank;
this.callState = Call.CallState.READY;
}
public void setHandler(Employee handler) {
this.handler = handler;
}
}
Class: Employee
package com.spsoft.callcenter;
import java.util.concurrent.atomic.AtomicBoolean;
public abstract class Employee {
String empId;
String name;
public Rank rank;
volatile Call currentCall;
CallCenter callCenter;
private final AtomicBoolean isFree = new AtomicBoolean(true);
public void setCallCenter(CallCenter callCenter) {
this.callCenter = callCenter;
}
public enum Rank {
OPERATOR, SUPERVISOR, DIRECTOR
}
public Employee(String empId, String name, Rank rank) {
this.empId = empId;
this.name = name;
this.rank = rank;
}
abstract void escalateCall();
public void receiveCall(Call call) {
synchronized (this) {
this.setFree(false);
// Associate the call with this employee as call handler
call.handler = this;
this.currentCall = call;
currentCall.callState = Call.CallState.IN_PROGRESS;
}
processCall(call, call.handler);
// If issue is resolved employee will complete the call, otherwise will escalate
if (currentCall.caller.isIssueResolved()) {
completeCall();
} else {
currentCall.callState = Call.CallState.READY;
try {
escalateCall();
} catch (UnsupportedOperationException ex) {
System.out.println(ex.getMessage());
completeCall();
}
}
}
public void completeCall() {
synchronized (this) {
this.currentCall.callState = Call.CallState.COMPLETE;
System.out.println("Call completed with caller id: " + currentCall.caller.callerId + " Call handler: " + currentCall.handler.name + " Type: " + currentCall.handler.rank + " Call status: " + currentCall.callState);
this.currentCall.caller.setIssueResolved(true);
this.setFree(true);
}
}
public boolean isFree() {
return isFree.get() && currentCall == null;
}
boolean assignNewCall(Call call) {
receiveCall(call);
return true;
}
public void setFree(boolean free) {
this.isFree.set(free);
this.currentCall = null;
}
private void processCall(Call call, Employee employee) {
try {
System.out.println("Placing call with callerId: " + call.caller.callerId + " to call handler: " + employee.name + ", Type: " + call.handler.rank);
Thread.sleep(5000); // Simulate call processing time
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Class: Operator
public class Operator extends Employee {
public Operator(String empId, String name) {
super(empId, name, Rank.OPERATOR);
}
@Override
void escalateCall() {
synchronized (this) {
this.currentCall.rank = Rank.SUPERVISOR;
//put call in the queue
this.callCenter.addCallToQueue(currentCall);
this.setFree(true); //free the operator for next call
//notify all supervisor
}
}
}
Class: Supervisor
package com.spsoft.callcenter;
public class Supervisor extends Employee {
public Supervisor(String empId, String name) {
super(empId, name, Rank.SUPERVISOR);
}
@Override
void escalateCall() {
synchronized (this) {
this.currentCall.rank = Rank.DIRECTOR;
//put call in the queue.
this.callCenter.addCallToQueue(currentCall);
this.setFree(true); //free the operator for next call
//notify all Director
}
}
}
Class: Director
package com.spsoft.callcenter;
public class Director extends Employee {
public Director(String empId, String name){
super(empId,name, Employee.Rank.DIRECTOR);
}
@Override
void escalateCall() {
throw new UnsupportedOperationException("Director can not escalate call, They must be able to handle calls");
}
}
Class: CallCenter
package com.spsoft.callcenter;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static java.util.concurrent.TimeUnit.SECONDS;
public class CallCenter {
String name;
private final Map<Employee.Rank, Queue<Call>> callQueues = new ConcurrentHashMap<>();
private final Map<Employee.Rank, List<Employee>> allEmployees = new ConcurrentHashMap<>();
private final Lock lock = new ReentrantLock();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
Runnable assignCallFromQueue = this::checkAvailabilityAndAssign;
public CallCenter(String name, List<Employee> operators, List<Employee> supervisors, List<Employee> directors) {
callQueues.put(Employee.Rank.OPERATOR, new LinkedList<>());
callQueues.put(Employee.Rank.SUPERVISOR, new LinkedList<>());
callQueues.put(Employee.Rank.DIRECTOR, new LinkedList<>());
// Associate all employee to this call center
operators.forEach(o -> o.setCallCenter(this));
supervisors.forEach(s -> s.setCallCenter(this));
directors.forEach(d -> d.setCallCenter(this));
allEmployees.put(Employee.Rank.OPERATOR, operators);
allEmployees.put(Employee.Rank.SUPERVISOR, supervisors);
allEmployees.put(Employee.Rank.DIRECTOR, directors);
// Schedule assignCallFromQueue
scheduler.scheduleAtFixedRate(assignCallFromQueue, 5, 10, SECONDS);
}
// Adds a call to the respective call queue by call handler's rank
public void addCallToQueue(Call call) {
lock.lock();
try {
Queue<Call> callQueue = callQueues.get(call.rank);
callQueue.add(call);
System.out.println("Queue Size for rank: " + call.rank + " is: " + callQueue.size());
} finally {
lock.unlock();
}
}
/*
The handleCall method is designed to manage incoming calls by finding an available employee and assigning the call to them.
It aims to ensure that call processing is handled concurrently, allowing the system to handle multiple calls
efficiently without blocking new incoming calls.
*/
public void handleCall(Call call) {
Optional<Employee> employeeOptional = findAvailableEmployee(call.rank);
// Dispatch call in a new thread so that a new call is not blocked.
new Thread(() -> {
dispatchCall(call, employeeOptional);
}).start();
}
/*
The dispatchCall method is responsible for assigning a call to an available employee if one is present.
If no employee is available, it adds the call to a waiting queue.
Input Parameters:
Call call: The call that needs to be dispatched.
Optional<Employee> employeeOptional: An Optional that may contain an available employee to handle the call.
*/
private void dispatchCall(Call call, Optional<Employee> employeeOptional) {
employeeOptional.ifPresentOrElse(emp -> {
emp.assignNewCall(call);
System.out.println("Call :" + call.caller.callerId + " assigned to employee: " + emp.name + " rank: " + emp.rank);
}, () -> {
lock.lock();
try {
System.out.println("Adding call to queue. Call Handler Rank: " + call.rank + " Caller: " + call.caller.callerId);
addCallToQueue(call);
} finally {
lock.unlock();
}
});
}
private Optional<Employee> findAvailableEmployee(Employee.Rank rank) {
lock.lock();
try {
List<Employee> employees = allEmployees.get(rank);
Optional<Employee> employee = employees.stream()
.filter(Employee::isFree)
.findFirst();
employee.ifPresent(emp -> {
// This is the most important part, otherwise first employee in the list will be picked up by all threads.
emp.setFree(false);
});
return employee;
} finally {
lock.unlock();
}
}
/*
The checkAvailabilityAndAssign method is responsible for checking the call queues for each rank of employees and assigning available calls to free employees.
in a thread-safe manner.
By using a lock, it prevents concurrent modifications to the call queues and employees' availability status.
*/
private void checkAvailabilityAndAssign() {
lock.lock();
try {
callQueues.forEach((rank, callQueue) -> {
System.out.println("Checking call Queue for rank: " + rank.name() + " Queue Size: " + callQueue.size());
if (!callQueue.isEmpty()) {
List<Employee> employees = allEmployees.get(rank);
for (Employee employee : employees) {
if (!callQueue.isEmpty() && employee.isFree()) {
Call call = callQueue.poll();
employee.assignNewCall(call);
if (callQueue.isEmpty()) {
break;
}
}
}
}
});
} finally {
lock.unlock();
}
}
}
Let’s Test the Program with a Test class.
Class: CallCenterTest
package com.spsoft.callcenter;
import java.util.ArrayList;
import java.util.List;
public class CallCenterTest {
public static void main(String args[]){
List<Employee> operators = new ArrayList<>();
operators.add(new Operator("1","Bob"));
operators.add(new Operator("2","Alex"));
operators.add(new Operator("3","Tom"));
operators.add(new Operator("4","Tina"));
List<Employee> supervisors = new ArrayList<>();
supervisors.add(new Supervisor("5","Adam"));
supervisors.add(new Supervisor("6","Josh"));
List<Employee> directors = new ArrayList<>();
directors.add(new Director("7","John"));
directors.add(new Director("8","Jain"));
CallCenter callCenter = new CallCenter("Verizon Wireless", operators,supervisors,directors);
callCenter.handleCall(new Call(new Caller("1"), Employee.Rank.OPERATOR));
callCenter.handleCall(new Call(new Caller("2"), Employee.Rank.OPERATOR));
callCenter.handleCall(new Call(new Caller("3"), Employee.Rank.OPERATOR));
callCenter.handleCall(new Call(new Caller("4"), Employee.Rank.OPERATOR));
callCenter.handleCall(new Call(new Caller("5"), Employee.Rank.OPERATOR));
//Extra Credit
callCenter.handleCall(new Call(new Caller("6"), Employee.Rank.SUPERVISOR));
callCenter.handleCall(new Call(new Caller("7"), Employee.Rank.DIRECTOR));
}
}
Let’s look at the output below:
Detail description of two important methods:
Method: checkAvailabilityAndAssign()
Purpose: The checkAvailabilityAndAssign method is responsible for checking the call queues for each rank of employees and assigning available calls to free employees.
Detailed Breakdown:
Lock Acquisition:
The method starts by acquiring a lock using lock.lock() to ensure that the operation is thread-safe.
Iterate Over Call Queues:
The method iterates over callQueues, which is a Map where the key is an Employee.Rank and the value is a Queue
For each entry in the callQueues, it prints the rank and the size of the queue.
Check for Non-Empty Queues:
If the queue for a particular rank is not empty (!callQueue.isEmpty()), it proceeds to get the list of employees for that rank from allEmployees.
Assign Calls to Free Employees:
It iterates over the list of employees for the current rank.
For each employee, it checks if the employee is free and if the call queue is not empty (employee.isFree() && !callQueue.isEmpty()).
Assign Call:
If both conditions are met, it polls a call from the queue (callQueue.poll()) and assigns it to the employee (employee.assignNewCall(call)).
If the call queue becomes empty after polling, it breaks out of the loop (if (callQueue.isEmpty()) break;).
Lock Release:
The finally block ensures that the lock is always released by calling lock.unlock() even if an exception occurs.
Summary:
This method ensures that calls are assigned to available employees in a thread-safe manner. By using a lock, it prevents concurrent modifications to the call queues and employees’ availability status. It also prints useful debugging information about the call queues and assignments.
Method: dispatchCall(Call call, Optional
Purpose: The dispatchCall method is responsible for assigning a call to an available employee if one is present. If no employee is available, it adds the call to a waiting queue.
Detailed Breakdown:
Input Parameters:
Call call: The call that needs to be dispatched.
Optional
Handling the Employee Assignment:
ifPresentOrElse: This method on Optional executes one of two possible blocks of code:
If Present: If employeeOptional contains an employee (emp), the method assigns the call to this employee:
emp.assignNewCall(call);
System.out.println("Call :" + call.caller.callerId + " assigned to employee: " + emp.name + " rank: " + emp.rank);
assignNewCall: The call is assigned to the employee.
Logging: A message is printed to the console, indicating that the call has been assigned to the employee.
If Not Present: If employeeOptional is empty (no available employee), the method proceeds to add the call to the waiting queue:
lock.lock();
try {
System.out.println("Adding call to queue. Call Handler Rank: " + call.rank + " Caller: " + call.caller.callerId);
addCallToQueue(call);
} finally {
lock.unlock();
}
Summary:
The dispatchCall method efficiently handles call assignment:
If an available employee is found, it assigns the call to them and logs the assignment.
If no employee is available, it safely adds the call to the waiting queue with proper synchronization using a lock.
This method ensures both thread safety and efficient handling of incoming calls in the call center system.