多线程等待(超时)/通知的应用场景-数据库连接池

时间:2021-10-17 17:07:50

闲话少说,直接上代码:

package com.ilucky.test.jdk.util.concurrent.pool;

import java.sql.Connection;
import java.util.LinkedList;

/** * 数据库连接池 * @author IluckySi * */
public class ConnectionPool {

    private LinkedList<Connection> pool = new LinkedList<Connection>();

    /** * 初始化连接池 * @param initialSize */
    public ConnectionPool(int initialSize) {
        if(initialSize > 0) {
            for(int i=0; i<initialSize; i++) {
                pool.addLast(ConnectionDriver.createConnection());
            }
        }
    }

    /** * 如果mills时间内无法获取到连接,返回null * @param mills * @return * @throws InterruptedException */
    public Connection fetchConnection(long mills) throws InterruptedException {
        synchronized(pool) {
            // 完全超时, 一直等待空闲链接.
            if(mills <= 0) {
                while(pool.isEmpty()) {
                    pool.wait();
                }
                return pool.removeFirst();
            } else {
                long future = System.currentTimeMillis() + mills;
                long remaining = mills;
                while(pool.isEmpty() && remaining > 0) {
                    pool.wait(remaining);
                    remaining = future - System.currentTimeMillis();
                }
                Connection connection = null;
                if(!pool.isEmpty()) {
                    connection = pool.removeFirst();
                }
                return connection;
            }
        }
    }

    /** * 释放连接 * @param connection */
    public void releaseConnection(Connection connection) {
        if(connection != null) {
            synchronized(pool) {
                pool.add(connection);
                // 释放连接后需要进行通知, 这样其他消费者能够感知到连接池中已经归还了一个链接。
                pool.notifyAll();
            }
        }
    }

}
package com.ilucky.test.jdk.util.concurrent.pool;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.util.concurrent.TimeUnit;

/** * 模拟数据库连接驱动 * 即通过代理的方式,模拟向数据库提交数据需要100毫秒。 * @author IluckySi * */
public class ConnectionDriver {

    /** * 内部类 * 代理类关联的InvocationHandler。 * @author IluckySi * */
    static class ConnectionHandler implements InvocationHandler {

        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if(method.getName().equals("commit")) {
                TimeUnit.MICROSECONDS.sleep(100);
            }
            return null;
        }
    }

    /** * 创建一个Connection的代理, 在commit时休眠100毫秒. * @return */
    public static final Connection createConnection() {
        return (Connection)Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(),
                new Class<?>[]{Connection.class}, new ConnectionHandler());
    }
}
package com.ilucky.test.jdk.util.concurrent.pool;

import java.sql.Connection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/** * 等待/通知机制. * 等待超时模式的应用场景数据库连接池 * @author IluckySi * */
public class ConnectionPoolTest {

    static ConnectionPool pool = new ConnectionPool(5);

    // 保证所有ConnectionRunner同时运行.
    static CountDownLatch prepare;
    static CountDownLatch finish;

    public static void main(String[] args) throws InterruptedException {
        // 线程数量, 可以通过修改线程数量进行观察.
        int threadCount = 200;
        prepare = new CountDownLatch(1);
        finish = new CountDownLatch(threadCount);

        // 每个线程发起20次获取数据库连接的请求.
        int count = 20;
        AtomicInteger got = new AtomicInteger(0);
        AtomicInteger notGot = new AtomicInteger(0);

        // 开始测试...
        for(int i=0; i<threadCount; i++) {
            Thread thread = new Thread(new ConnectionRunner(count, got, notGot), "ConnectionRunnerThread");
            thread.start();
        }
        prepare.countDown();
        finish.await();
        System.out.println("Total invoker:" + (threadCount * count));
        System.out.println("Got connection: " + got);
        System.out.println("NotGot connection: " + notGot);
    }

    static class ConnectionRunner implements Runnable {
        int count;
        AtomicInteger got;
        AtomicInteger notGot;

        public ConnectionRunner(int count, AtomicInteger got, AtomicInteger notGot) {
            this.count = count;
            this.got = got;
            this.notGot = notGot;
        }

        public void run() {
            try {
                prepare.await();
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }

            while(count > 0) {
                try {
                    // 从线程池中获取连接, 如果1000ms内无法获取到, 将会返回null,
                    // 分别统计链接获取的数量got和未获取连接的数量notGot.
                    Connection connection = pool.fetchConnection(1000);
                    if(connection!= null) {
                        try {
                            connection.createStatement();
                            connection.commit();
                        } finally {
                            pool.releaseConnection(connection);
                            got.incrementAndGet();
                        } 
                    } else {
                        notGot.incrementAndGet();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    count--;
                }
            }
            finish.countDown();
        }
    }
}
/** 注意: 可以通过修改线程数量和超时时间做测试。 -------------------------------10个连接的情况下----------------- Total invoker:400 Got connection: 400 NotGot connection: 0 Total invoker:2000 Got connection: 2000 NotGot connection: 0 Total invoker:4000 Got connection: 4000 NotGot connection: 0 Total invoker:8000 Got connection: 8000 NotGot connection: 0 Total invoker:10000 Got connection: 9989 NotGot connection: 11 Total invoker:20000 Got connection: 19204 NotGot connection: 796 Total invoker:200000 Got connection: 18013 NotGot connection: 181987 -------------------------------100个连接的情况下----------------- Total invoker:10000 Got connection: 10000 NotGot connection: 0 Total invoker:20000 Got connection: 20000 NotGot connection: 0 Total invoker:40000 Got connection: 40000 NotGot connection: 0 Total invoker:60000 Got connection: 59206 NotGot connection: 794 -------------------------------1000个连接的情况下----------------- Total invoker:60000 Got connection: 60000 NotGot connection: 0 Total invoker:100000 Got connection: 100000 NotGot connection: 0 Total invoker:160000 Got connection: 160000 NotGot connection: 0 Total invoker:200000 Got connection: 199987 NotGot connection: 13 */