描述:使用等待超时模式来构造一个简单的数据库连接池,在示例中模拟从连接池中获取、使用和释放连接的过程。客户端获取连接的过程被设计成等待超时模式
1 /* 2 连接池的定义。通过构造函数初始化连接的最大上限,通过双向队列维护连接。调用方需要先调用fetchConnection(long) 3 方法来指定在多少毫秒内超时获取连接,当连接使用完时,需要调用releaseConnection(Connection)方法来将来凝结放回 4 线程池 5 */ 6 class ConnectionPool{ 7 private LinkedList<Connection> pool = new LinkedList<Connection>(); 8 public ConnectionPool(int initialSize){ 9 if (initialSize > 0){ 10 for (int i = 0; i < initialSize; i++){ 11 pool.addLast(ConnectionDriver.creatConnection()); 12 } 13 } 14 } 15 public void releaseConnection(Connection connection){ 16 if(connection != null){ 17 synchronized (pool){ 18 //连接释放后需要进行通知,这样其他消费者能够感知到连接池中已经归还了一个连接 19 pool.addLast(connection);///? 20 pool.notifyAll(); 21 } 22 } 23 } 24 25 //在mills内无法获得连接,将会返回null 26 public Connection fetchConnection(long mills) throws InterruptedException{ 27 synchronized (pool){ 28 //完全超时 29 if (mills <= 0){ 30 while (pool.isEmpty()){ 31 pool.wait(); 32 } 33 return pool.removeFirst(); 34 }else { 35 long future = System.currentTimeMillis() + mills; 36 long remaining = mills; 37 while (pool.isEmpty() && remaining > 0){ 38 pool.wait(remaining); 39 remaining = future - System.currentTimeMillis(); 40 } 41 Connection result = null; 42 if (!pool.isEmpty()){ 43 result = pool.removeFirst(); 44 } 45 return result; 46 } 47 } 48 } 49 } 50 /* 51 由于java.sql.Connection是一个接口,最终实现是由数据库驱动提供方案来实现的,这里通过动态代理构造了一个Connection, 52 该Connection的代理实现仅仅实在commit方法调用时休眠100毫秒 53 */ 54 class ConnectionDriver{ 55 static class ConnectionHander implements InvocationHandler{ 56 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable{ 57 if (method.getName().equals("commit")){ 58 TimeUnit.MILLISECONDS.sleep(100); 59 } 60 return null; 61 } 62 } 63 //创建一个Connection的代理,在commit时休眠100毫秒 64 public static final Connection creatConnection(){ 65 return (Connection) Proxy.newProxyInstance(ConnectionHander.class.getClassLoader(), 66 new Class<?>[] {Connection.class},new ConnectionHander()); 67 } 68 } 69 70 /* 71 测试,客户端模拟ConnectionRunner获取、使用、释放连接的过程 72 */ 73 class ConnectionPoolTest{ 74 static ConnectionPool pool = new ConnectionPool(10); 75 //保证所有的ConnectionRunner能同时开始 76 static CountDownLatch start = new CountDownLatch(1); 77 //main线程将会等待所有ConnectionRunner结束后才能继续执行 78 static CountDownLatch end; 79 public static void main(String[] args) throws Exception{ 80 //线程数量,可以修改线程数量进行观察 81 int threadCount = 20; 82 end = new CountDownLatch(threadCount); 83 int count = 20; 84 AtomicInteger got = new AtomicInteger(); 85 AtomicInteger notGot = new AtomicInteger(); 86 for (int i = 0; i < threadCount; i++){ 87 Thread thread = new Thread(new ConnectionRunner(count, got, notGot), "ConnectionRunnerThread"); 88 thread.start(); 89 } 90 start.countDown();//只要计数器为0,那么线程就可以结束阻塞往下执行,进而保证线程同时开始 91 end.await(); 92 System.out.println("total invoke: " + (threadCount * count)); 93 System.out.println("got connection: " + got); 94 System.out.println("not got connection: " + notGot); 95 } 96 97 static class ConnectionRunner implements Runnable{ 98 int count; 99 AtomicInteger got; 100 AtomicInteger notGot; 101 public ConnectionRunner(int count, AtomicInteger got, AtomicInteger notGot){ 102 this.count = count; 103 this.got = got; 104 this.notGot = notGot; 105 } 106 @Override 107 public void run(){ 108 try{ 109 start.await(); 110 }catch (Exception ex){ 111 } 112 while (count > 0){ 113 try { 114 //从线程池中获取连接,如果1000ms内无法获取到,将会返回null 115 //分别统计连接获取的数量got和未获取到的数量notGot 116 Connection connection = pool.fetchConnection(1000); 117 if (connection != null){ 118 try { 119 connection.createStatement(); 120 connection.commit(); 121 }finally { 122 pool.releaseConnection(connection); 123 got.getAndIncrement(); 124 } 125 }else { 126 notGot.getAndIncrement(); 127 } 128 }catch (Exception ex){ 129 }finally { 130 count--; 131 } 132 } 133 end.countDown(); 134 } 135 } 136 }
输出结果:
1 total invoke: 400 2 got connection: 398 3 not got connection: 2