Design a thread-safe connection pool
#ConnectionPool: a connection pool is a cache of reusable database connections managed by the client or middleware. It reduces the overhead of opening and closing connections, improving performance and scalability in database applications
Design a thread-safe Connection Pool implementing the following interface
public interface ConnectionPool {
/**
* Returns a connection from this pool if it is available
* or throws SQLException when no connection available
* @return connection from this pool
* @throws SQLException thrown when connection is not available
*/
public Connection getConnection() throws SQLException, InterruptedException;
/**
* Returns a connection from this pool if it is available
* otherwise waits for no more than timeout milliseconds to get a connection
* @param timeout timeout in milliseconds
* @return connection from this pool is it becomes available within timeout milliseconds
* @throws SQLException thrown when connection is not available
*/
public Connection getConnection(long timeout) throws SQLException, InterruptedException;
/**
* Returns connection to the pool
* @param connection connection to be returned to the pool
*/
public void releaseConnection(Connection connection);
/**
* Get the number of connections currently in the pool.
*/
public int getConnectionsInPool();
/**
* Get the number of used connections (i.e., connections not currently in the pool).
*/
public int getUsedConnections();
}
Constraints and assumptions
Instance of the connection pool implementation should accept an integer which is the maximumPoolSize and should not create more than maximumPoolSize connections to the database.
If watching the topic on video more convenient to you, you can watch it on YouTube below:
Implementation using java LinkedList as a Queue:
package com.spsoft.connectionpool;
import java.sql.*;
import java.util.*;
public class ConnectionPoolUsingList implements ConnectionPool {
private final String url;
private final String user;
private final String password;
private final int poolSize;
private Driver driver;
private final Queue<Connection> connectionPool;
public ConnectionPoolUsingList(String driverClassName, String url, String user, String password, int poolSize) {
this.url = url;
this.user = user;
this.password = password;
this.poolSize = poolSize;
this.connectionPool = new LinkedList<>();
try {
this.driver = (Driver) Class.forName(driverClassName).newInstance();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
}
initializePool();
}
private void initializePool() {
for (int i = 0; i < poolSize; i++) {
try {
Connection connection = DriverManager.getConnection(url, user, password);
connectionPool.add(connection);
} catch (SQLException e) {
e.printStackTrace();
}
}
}
public Connection getConnection() throws SQLException, InterruptedException {
return this.getConnection(100);
}
@Override
public synchronized Connection getConnection(long timeout) throws SQLException, InterruptedException {
long timestamp = System.currentTimeMillis() + timeout;
while (connectionPool.isEmpty()) {
long waitTime = timestamp - System.currentTimeMillis();
if (waitTime <= 0) {
throw new SQLException("Connection not available");
}
try {
//wait() in java.lang.Object Causes the current thread to wait until it is awakened, typically by being notified or interrupted.
wait(waitTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedException("Thread interrupted while waiting for connection.");
}
}
return connectionPool.poll();
}
public synchronized void releaseConnection(Connection connection) {
if (connection != null) {
connectionPool.add(connection);
notifyAll();
}
}
/**
* Get the number of connections currently in the pool.
*/
public synchronized int getConnectionsInPool() {
return connectionPool.size();
}
/**
* Get the number of used connections (i.e., connections not currently in the pool).
*/
public synchronized int getUsedConnections() {
return poolSize - connectionPool.size();
}
}
Some important methods/steps are described below:
Connection Pool Initialization:
The ConnectionPool class initializes a pool of connections based on the provided database URL, username, password, and pool size.
initializePool:
The initializePool method creates the specified number of connections and adds them to the pool.
Get Connection:
The getConnection method retrieves a connection from the pool. If the pool is empty, it waits until a connection is available.
Release Connection:
The releaseConnection method adds the connection back to the pool and notifies any waiting threads that a connection is available.
getConnectionsInPool: Returns the number of connections currently available in the pool.
getUsedConnections: Returns the number of connections that are currently in use (not in the pool).
Implementation using java ArrayBlockingQueue as a Queue:
Why BlockingQueue?
Justification:
Built-in Waiting Mechanisms:
BlockingQueue offers methods such as poll(long timeout, TimeUnit unit) which are specifically designed for handling waiting with timeouts. This eliminates the need for manually managing wait and notify.
Simplicity and Readability:
Utilizing BlockingQueue methods makes the code simpler and more readable, reducing the risk of errors associated with manual synchronization and wait/notify.
Efficiency:
BlockingQueue’s built-in mechanisms are optimized for handling concurrent access and waiting, ensuring better performance and resource management.
package com.spsoft.connectionpool;
import java.sql.*;
import java.util.concurrent.*;
public class ConnectionPoolUsingBlockingQueue implements ConnectionPool {
private final String url;
private final String user;
private final String password;
private final int poolSize;
private Driver driver;
private final BlockingQueue<Connection> connectionPool;
public ConnectionPoolUsingBlockingQueue(String driverClassName, String url, String user, String password, int poolSize) {
this.url = url;
this.user = user;
this.password = password;
this.poolSize = poolSize;
this.connectionPool = new ArrayBlockingQueue<>(poolSize);
try {
this.driver = (Driver) Class.forName(driverClassName).newInstance();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
}
initializePool();
}
private void initializePool() {
for (int i = 0; i < poolSize; i++) {
try {
Connection connection = DriverManager.getConnection(url, user, password);
connectionPool.add(connection);
} catch (SQLException e) {
e.printStackTrace();
}
}
}
public Connection getConnection() throws SQLException, InterruptedException {
return this.getConnection(10);
}
@Override
public Connection getConnection(long timeout) throws SQLException, InterruptedException {
Connection connection = connectionPool.poll(timeout,TimeUnit.MILLISECONDS);
if (connection == null) {
throw new SQLException("Connection not available");
}
return connection;
}
public void releaseConnection(Connection connection) {
if (connection != null) {
try {
connectionPool.put(connection);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
}
/**
* Get the number of connections currently in the pool.
*/
public int getConnectionsInPool() {
return connectionPool.size();
}
/**
* Get the number of used connections (i.e., connections not currently in the pool).
*/
public int getUsedConnections() {
return poolSize - connectionPool.size();
}
}
Test both Implementation using the below ConnectionPoolTest Class:
Used Java Virtual Thread Per Task executor to fire 1000 queries using 100 connections in the connection pool
What is a Virtual Thread?
Like a platform thread, a virtual thread is also an instance of java.lang.Thread. However, a virtual thread isn’t tied to a specific OS thread. A virtual thread still runs code on an OS thread. However, when code running in a virtual thread calls a blocking I/O operation, the Java runtime suspends the virtual thread until it can be resumed. The OS thread associated with the suspended virtual thread is now free to perform operations for other virtual threads.
package com.spsoft.connectionpool;
import java.sql.*;
import java.util.concurrent.*;
import java.util.stream.IntStream;
public class ConnectionPoolTest {
String url = "jdbc:mysql://localhost:3306/salesmanager";
String driverClassName = "com.mysql.cj.jdbc.Driver";
String user = System.getenv("user");
String password = System.getenv("password");
int poolSize = 100;
ConnectionPool connectionPool = new ConnectionPoolUsingList(driverClassName, url, user, password, poolSize);
//ConnectionPool connectionPool = new ConnectionPoolUsingBlockingQueue(driverClassName, url, user, password, poolSize);
public void executeQueries(){
try (ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor()){
IntStream.range(0,1000).forEach(
i-> executorService.submit(queryRunnable)
);
};
}
Runnable queryRunnable =()-> {
// Get a connection from the pool
Connection connection = null;
try {
connection = connectionPool.getConnection(100);
Statement statement = connection.createStatement();
// Execute your SQL queries
ResultSet rs = statement.executeQuery("select * from salesmanager.product");
rs.next();
System.out.println("Product Id: "+rs.getInt(1));
System.out.println("Thread: "+ Thread.currentThread().threadId()+ " Connections in Pool: " + connectionPool.getConnectionsInPool()+" Used Connections: "+ connectionPool.getUsedConnections());
// Release the connection back to the pool
connectionPool.releaseConnection(connection);
} catch (SQLException | InterruptedException e) {
System.out.println(e.getMessage());
}
};
public static void main(String[] args) {
ConnectionPoolTest test = new ConnectionPoolTest();
test.executeQueries();
}
}
Output of the class ConnectionPoolTest
GitHub link for Connection Pool
Author: Mohammad J Iqbal